Skip to content

Commit

Permalink
Merge pull request #4 from starwit/feature/AB#961-create-counting-alg…
Browse files Browse the repository at this point in the history
…orithm-linecrossing

Feature/ab#961 create counting algorithm linecrossing
  • Loading branch information
witchpou authored Dec 14, 2023
2 parents c1260dc + ccf6160 commit 6eeb621
Show file tree
Hide file tree
Showing 12 changed files with 390 additions and 74 deletions.
15 changes: 9 additions & 6 deletions application/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ spring.flyway.placeholder-replacement=false
springdoc.swagger-ui.csrf.enabled=true

# Configuration
analytics.dataRetrievalRate=1000
analytics.jobRunInterval=2000
analytics.maxDataInterval=10000

sae.datasource.url=jdbc:postgresql://localhost:5435/sae
sae.datasource.username=sae
sae.datasource.password=sae
sae.detection.tablename=detection
sae.datasource.url=jdbc:postgresql://localhost:5432/postgres
sae.datasource.username=postgres
sae.datasource.password=postgres
sae.detection.tablename=data

analytics.datasource.username=analytics
analytics.datasource.password=analytics
Expand All @@ -52,4 +53,6 @@ analytics.datasource.flyway.locations=classpath:db/migration/analytics,classpath


# logging.level.org.springframework.security=DEBUG
# logging.level.org.springframework.web=DEBUG
# logging.level.org.springframework.web=DEBUG

# spring.jpa.show-sql=true
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import org.springframework.transaction.annotation.Transactional;

import de.starwit.persistence.analytics.entity.LineCrossingEntity;
import de.starwit.persistence.sae.entity.SaeCountEntity;
import de.starwit.persistence.sae.entity.SaeDetectionEntity;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;

Expand All @@ -23,24 +21,22 @@ EntityManager getEntityManager() {
}

@Transactional("analyticsTransactionManager")
public void insert(SaeDetectionEntity entity) {
// TODO
String insertString = "insert into linecrossing(crossingtime, parkingareaid, objectid, objectclassid) values(:crossingtime, :parkingareaid, :objectid, :classId)";

/*
* getEntityManager().createNativeQuery(insertString)
* .setParameter("crossingtime", entity.getCaptureTs())
* .setParameter("parkingareaid", 1)
* .setParameter("objectid", "unknown")
* .setParameter("classId", entity.getObjectClassId())
* .executeUpdate();
*/
}
public void insert(LineCrossingEntity entity) {
String insertString = "insert into linecrossing(crossingtime, parkingareaid, objectid, direction, objectclassid) values(:crossingtime, :parkingareaid, :objectid, :direction, :classId)";

em.createNativeQuery(insertString)
.setParameter("crossingtime", entity.getCrossingTime())
.setParameter("parkingareaid", entity.getParkingAreaId())
.setParameter("objectid", entity.getObjectId())
.setParameter("direction", entity.getDirection().toString())
.setParameter("classId", entity.getObjectClassId())
.executeUpdate();
}

// TODO
public List<LineCrossingEntity> findFirst100() {
String queryString = "select * from linecrossing order by crossingtime desc limit 100";
return getEntityManager().createNativeQuery(queryString).getResultList();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,37 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import de.starwit.persistence.sae.entity.SaeDetectionEntity;
import de.starwit.persistence.sae.entity.SaeDetectionRowMapper;

@Component
public class SaeDao {

@Value("${sae.detection.tablename}")
private static String hyperTableName;

private static String getDetectionDataSql = "select * from "
+ hyperTableName
+ " where \"capture_ts\" > ? "
+ "and \"camera_id\" = ? "
+ "and \"class_id\" = ?"
+ "order by \"capture_ts\" ASC";
private String getDetectionDataSql;

@Autowired
@Qualifier("saeJdbcTemplate")
private JdbcTemplate saeJdbcTemplate;

@Value("${sae.detectionsTableName}")
private String detectionTableName;

Logger log = LoggerFactory.getLogger(this.getClass());

public SaeDao(@Value("${sae.detection.tablename}") String hyperTableName) {
this.getDetectionDataSql = "select * from "
+ hyperTableName
+ " where \"capture_ts\" > ? "
+ "and \"camera_id\" = ? "
+ "and \"class_id\" = ? "
+ "order by \"capture_ts\" ASC";
}

public List<SaeDetectionEntity> getDetectionData(Instant lastRetrievedTime, String cameraId,
Integer detectionClassId) {
try {
LocalDateTime ldt = LocalDateTime.ofInstant(lastRetrievedTime, java.time.ZoneId.systemDefault());
List<SaeDetectionEntity> result = saeJdbcTemplate.query(getDetectionDataSql,
new SaeDetectionRowMapper(), ldt, cameraId, detectionClassId);
log.info("count in getDetectingData{}", result.size());
return result;
} catch (Exception e) {
log.error("Error in getDetectionData", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package de.starwit.persistence.sae.repository;

import java.time.Instant;
import java.util.Arrays;
import java.util.List;

import org.springframework.beans.factory.annotation.Value;
Expand All @@ -21,18 +22,36 @@ public class SaeRepository {
@Value("${sae.detection.tablename}")
private String hyperTableName;

public List<SaeCountEntity> getDetectionData(Instant lastRetrievedTime, String cameraId,
public List<SaeCountEntity> getCountData(Instant lastRetrievedTime, String cameraId,
Integer detectionClassId) {

String getDetectionData = ""
String queryString = ""
+ "select capture_ts, class_id, count(object_id)"
+ " from " + hyperTableName
+ " where capture_ts > :capturets"
+ " and camera_id = :cameraid"
+ " and class_id = :classid"
+ " group by capture_ts, class_id";

Query q = em.createNativeQuery(getDetectionData, SaeCountEntity.class);
Query q = em.createNativeQuery(queryString, SaeCountEntity.class);
q.setParameter("capturets", lastRetrievedTime);
q.setParameter("cameraid", cameraId);
q.setParameter("classid", detectionClassId);
return q.getResultList();
}

public List<SaeDetectionEntity> getDetectionData(Instant lastRetrievedTime, String cameraId,
Integer detectionClassId) {

String queryString = ""
+ "select *"
+ " from " + hyperTableName
+ " where capture_ts > :capturets"
+ " and camera_id = :cameraid"
+ " and class_id = :classid"
+ " order by capture_ts asc";

Query q = em.createNativeQuery(queryString, SaeDetectionEntity.class);
q.setParameter("capturets", lastRetrievedTime);
q.setParameter("cameraid", cameraId);
q.setParameter("classid", detectionClassId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package de.starwit.service.analytics;

import java.time.ZoneId;
import java.time.ZonedDateTime;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import de.starwit.persistence.analytics.entity.Direction;
import de.starwit.persistence.analytics.entity.LineCrossingEntity;
import de.starwit.persistence.analytics.repository.LineCrossingRepository;
import de.starwit.persistence.sae.entity.SaeDetectionEntity;

Expand All @@ -19,7 +24,13 @@ public class LineCrossingService {
private LineCrossingRepository linecrossingRepository;

@Transactional
public void addEntry(SaeDetectionEntity entity) {
public void addEntry(SaeDetectionEntity det, Long parkingAreaId, Direction direction) {
LineCrossingEntity entity = new LineCrossingEntity();
entity.setCrossingTime(ZonedDateTime.ofInstant(det.getCaptureTs(), ZoneId.systemDefault()));
entity.setDirection(direction);
entity.setObjectClassId(det.getClassId());
entity.setObjectId(det.getObjectId());
entity.setParkingAreaId(parkingAreaId);
linecrossingRepository.insert(entity);
}
}
18 changes: 13 additions & 5 deletions service/src/main/java/de/starwit/service/jobs/AbstractJob.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
package de.starwit.service.jobs;

import java.time.Instant;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import de.starwit.persistence.common.entity.AbstractCaptureEntity;

public abstract class AbstractJob<E extends AbstractCaptureEntity> {

@Value("${analytics.maxDataInterval:10000}")
int maxDataInterval;

final Logger log = LoggerFactory.getLogger(this.getClass());

public void getAndProcessNewData(JobData<E> jobData) throws InterruptedException {
public void run(JobData<E> jobData) throws InterruptedException {

if (jobData.getLastRetrievedTime().isBefore(Instant.now().minusMillis(maxDataInterval))) {
jobData.setLastRetrievedTime(Instant.now().minusMillis(maxDataInterval));
}

List<E> newData = this.getData(jobData);

Expand All @@ -20,8 +30,8 @@ public void getAndProcessNewData(JobData<E> jobData) throws InterruptedException
if (newData != null && !newData.isEmpty()) {
jobData.setLastRetrievedTime(newData.get(newData.size() - 1).getCaptureTs());

for (int i = newData.size() - 1; i >= 0; i--) {
success = jobData.getInputData().offer(newData.get(i));
for (E dataPoint : newData) {
success = jobData.getInputData().offer(dataPoint);
if (!success) {
discardCount++;
}
Expand All @@ -32,8 +42,6 @@ public void getAndProcessNewData(JobData<E> jobData) throws InterruptedException
}

this.process(jobData);

// Pass data to database output / writer
}

abstract List<E> getData(JobData<E> jobData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

Expand All @@ -18,9 +17,6 @@ public class AnalyticsJobCreator {

Logger log = LoggerFactory.getLogger(this.getClass());

@Value("${analytics.dataRetrievalRate:2000}")
private int dataRetrievalRate;

private List<JobData> jobsToRun = null;

@Autowired
Expand All @@ -32,7 +28,7 @@ public class AnalyticsJobCreator {
@Autowired
private LineCrossingJob lineCrossingJob;

@Scheduled(initialDelay = 0, fixedRate = 1000000)
@Scheduled(initialDelay = 0, fixedRate = 10000)
private void refreshJobs() {
log.info("in refreshJobs");
jobsToRun = new ArrayList<>();
Expand All @@ -42,27 +38,26 @@ private void refreshJobs() {
}
}

@Scheduled(initialDelay = 1000, fixedRate = 10000)
@Scheduled(initialDelay = 1000, fixedRateString = "${analytics.jobRunInterval:10000}")
private void runJobs() {
log.info("in runJobs");
try {
if (jobsToRun != null && !jobsToRun.isEmpty()) {
for (JobData job : jobsToRun) {
if (jobsToRun != null && !jobsToRun.isEmpty()) {
for (JobData job : jobsToRun) {
try {
log.debug("Running job: {}", job.getConfig().getName());
switch (job.getConfig().getType()) {
case LINE_CROSSING:
lineCrossingJob.getAndProcessNewData(job);
lineCrossingJob.run(job);
break;
case AREA_OCCUPANCY:
areaOccupancyJob.getAndProcessNewData(job);
areaOccupancyJob.run(job);
break;
default:
break;
}
log.debug("Running job: {}", job.getConfig().getName());
} catch (Exception e) {
log.error("Exception during job run {}", job.getConfig().getName(), e);
}
}
} catch (Exception e) {
log.error("Exception during running the job", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package de.starwit.service.jobs;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -21,17 +21,17 @@ public class AreaOccupancyJob extends AbstractJob<SaeCountEntity> {

@Override
List<SaeCountEntity> getData(JobData<SaeCountEntity> jobData) {
return saeRepository.getDetectionData(jobData.getLastRetrievedTime(),
return saeRepository.getCountData(jobData.getLastRetrievedTime(),
jobData.getConfig().getCameraId(),
jobData.getConfig().getDetectionClassId());
}

@Override
void process(JobData<SaeCountEntity> jobData) throws InterruptedException {
if (jobData != null) {
ArrayBlockingQueue<SaeCountEntity> queue = jobData.getInputData();
Queue<SaeCountEntity> queue = jobData.getInputData();
while (queue != null && !queue.isEmpty()) {
areaOccupancyService.addEntry(queue.take());
areaOccupancyService.addEntry(queue.poll());
}
}
}
Expand Down
11 changes: 4 additions & 7 deletions service/src/main/java/de/starwit/service/jobs/JobData.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,28 @@
package de.starwit.service.jobs;

import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;

import de.starwit.persistence.common.entity.AbstractCaptureEntity;
import de.starwit.persistence.databackend.entity.AnalyticsJobEntity;

public class JobData<E extends AbstractCaptureEntity> {

private ArrayBlockingQueue<E> inputData;
private Queue<E> inputData;
private final AnalyticsJobEntity config;
private Instant lastRetrievedTime;

JobData(AnalyticsJobEntity config) {
this.inputData = new ArrayBlockingQueue<>(100);
this.inputData = new ArrayBlockingQueue<>(500);
this.config = config;
this.lastRetrievedTime = Instant.now();
}

public ArrayBlockingQueue<E> getInputData() {
public Queue<E> getInputData() {
return inputData;
}

public void setInputData(ArrayBlockingQueue<E> inputData) {
this.inputData = inputData;
}

public AnalyticsJobEntity getConfig() {
return config;
}
Expand Down
Loading

0 comments on commit 6eeb621

Please sign in to comment.