Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ab#961 create counting algorithm linecrossing #4

Merged
merged 13 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions application/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ spring.flyway.placeholder-replacement=false
springdoc.swagger-ui.csrf.enabled=true

# Configuration
analytics.dataRetrievalRate=1000
analytics.jobRunInterval=2000
analytics.maxDataInterval=10000

sae.datasource.url=jdbc:postgresql://localhost:5435/sae
sae.datasource.username=sae
sae.datasource.password=sae
sae.detection.tablename=detection
sae.datasource.url=jdbc:postgresql://localhost:5432/postgres
sae.datasource.username=postgres
sae.datasource.password=postgres
sae.detection.tablename=data

analytics.datasource.username=analytics
analytics.datasource.password=analytics
Expand All @@ -52,4 +53,6 @@ analytics.datasource.flyway.locations=classpath:db/migration/analytics,classpath


# logging.level.org.springframework.security=DEBUG
# logging.level.org.springframework.web=DEBUG
# logging.level.org.springframework.web=DEBUG

# spring.jpa.show-sql=true
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import org.springframework.transaction.annotation.Transactional;

import de.starwit.persistence.analytics.entity.LineCrossingEntity;
import de.starwit.persistence.sae.entity.SaeCountEntity;
import de.starwit.persistence.sae.entity.SaeDetectionEntity;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;

Expand All @@ -23,24 +21,22 @@ EntityManager getEntityManager() {
}

@Transactional("analyticsTransactionManager")
public void insert(SaeDetectionEntity entity) {
// TODO
String insertString = "insert into linecrossing(crossingtime, parkingareaid, objectid, objectclassid) values(:crossingtime, :parkingareaid, :objectid, :classId)";

/*
* getEntityManager().createNativeQuery(insertString)
* .setParameter("crossingtime", entity.getCaptureTs())
* .setParameter("parkingareaid", 1)
* .setParameter("objectid", "unknown")
* .setParameter("classId", entity.getObjectClassId())
* .executeUpdate();
*/
}
public void insert(LineCrossingEntity entity) {
String insertString = "insert into linecrossing(crossingtime, parkingareaid, objectid, direction, objectclassid) values(:crossingtime, :parkingareaid, :objectid, :direction, :classId)";

em.createNativeQuery(insertString)
.setParameter("crossingtime", entity.getCrossingTime())
.setParameter("parkingareaid", entity.getParkingAreaId())
.setParameter("objectid", entity.getObjectId())
.setParameter("direction", entity.getDirection().toString())
.setParameter("classId", entity.getObjectClassId())
.executeUpdate();
}

// TODO
public List<LineCrossingEntity> findFirst100() {
String queryString = "select * from linecrossing order by crossingtime desc limit 100";
return getEntityManager().createNativeQuery(queryString).getResultList();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,37 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import de.starwit.persistence.sae.entity.SaeDetectionEntity;
import de.starwit.persistence.sae.entity.SaeDetectionRowMapper;

@Component
public class SaeDao {

@Value("${sae.detection.tablename}")
private static String hyperTableName;

private static String getDetectionDataSql = "select * from "
+ hyperTableName
+ " where \"capture_ts\" > ? "
+ "and \"camera_id\" = ? "
+ "and \"class_id\" = ?"
+ "order by \"capture_ts\" ASC";
private String getDetectionDataSql;

@Autowired
@Qualifier("saeJdbcTemplate")
private JdbcTemplate saeJdbcTemplate;

@Value("${sae.detectionsTableName}")
private String detectionTableName;

Logger log = LoggerFactory.getLogger(this.getClass());

public SaeDao(@Value("${sae.detection.tablename}") String hyperTableName) {
this.getDetectionDataSql = "select * from "
+ hyperTableName
+ " where \"capture_ts\" > ? "
+ "and \"camera_id\" = ? "
+ "and \"class_id\" = ? "
+ "order by \"capture_ts\" ASC";
}

public List<SaeDetectionEntity> getDetectionData(Instant lastRetrievedTime, String cameraId,
Integer detectionClassId) {
try {
LocalDateTime ldt = LocalDateTime.ofInstant(lastRetrievedTime, java.time.ZoneId.systemDefault());
List<SaeDetectionEntity> result = saeJdbcTemplate.query(getDetectionDataSql,
new SaeDetectionRowMapper(), ldt, cameraId, detectionClassId);
log.info("count in getDetectingData{}", result.size());
return result;
} catch (Exception e) {
log.error("Error in getDetectionData", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package de.starwit.persistence.sae.repository;

import java.time.Instant;
import java.util.Arrays;
import java.util.List;

import org.springframework.beans.factory.annotation.Value;
Expand All @@ -21,18 +22,36 @@ public class SaeRepository {
@Value("${sae.detection.tablename}")
private String hyperTableName;

public List<SaeCountEntity> getDetectionData(Instant lastRetrievedTime, String cameraId,
public List<SaeCountEntity> getCountData(Instant lastRetrievedTime, String cameraId,
Integer detectionClassId) {

String getDetectionData = ""
String queryString = ""
+ "select capture_ts, class_id, count(object_id)"
+ " from " + hyperTableName
+ " where capture_ts > :capturets"
+ " and camera_id = :cameraid"
+ " and class_id = :classid"
+ " group by capture_ts, class_id";

Query q = em.createNativeQuery(getDetectionData, SaeCountEntity.class);
Query q = em.createNativeQuery(queryString, SaeCountEntity.class);
q.setParameter("capturets", lastRetrievedTime);
q.setParameter("cameraid", cameraId);
q.setParameter("classid", detectionClassId);
return q.getResultList();
}

public List<SaeDetectionEntity> getDetectionData(Instant lastRetrievedTime, String cameraId,
Integer detectionClassId) {

String queryString = ""
+ "select *"
+ " from " + hyperTableName
+ " where capture_ts > :capturets"
+ " and camera_id = :cameraid"
+ " and class_id = :classid"
+ " order by capture_ts asc";

Query q = em.createNativeQuery(queryString, SaeDetectionEntity.class);
q.setParameter("capturets", lastRetrievedTime);
q.setParameter("cameraid", cameraId);
q.setParameter("classid", detectionClassId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package de.starwit.service.analytics;

import java.time.ZoneId;
import java.time.ZonedDateTime;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import de.starwit.persistence.analytics.entity.Direction;
import de.starwit.persistence.analytics.entity.LineCrossingEntity;
import de.starwit.persistence.analytics.repository.LineCrossingRepository;
import de.starwit.persistence.sae.entity.SaeDetectionEntity;

Expand All @@ -19,7 +24,13 @@ public class LineCrossingService {
private LineCrossingRepository linecrossingRepository;

@Transactional
public void addEntry(SaeDetectionEntity entity) {
public void addEntry(SaeDetectionEntity det, Long parkingAreaId, Direction direction) {
LineCrossingEntity entity = new LineCrossingEntity();
entity.setCrossingTime(ZonedDateTime.ofInstant(det.getCaptureTs(), ZoneId.systemDefault()));
entity.setDirection(direction);
entity.setObjectClassId(det.getClassId());
entity.setObjectId(det.getObjectId());
entity.setParkingAreaId(parkingAreaId);
linecrossingRepository.insert(entity);
}
}
18 changes: 13 additions & 5 deletions service/src/main/java/de/starwit/service/jobs/AbstractJob.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
package de.starwit.service.jobs;

import java.time.Instant;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import de.starwit.persistence.common.entity.AbstractCaptureEntity;

public abstract class AbstractJob<E extends AbstractCaptureEntity> {

@Value("${analytics.maxDataInterval:10000}")
int maxDataInterval;

final Logger log = LoggerFactory.getLogger(this.getClass());

public void getAndProcessNewData(JobData<E> jobData) throws InterruptedException {
public void run(JobData<E> jobData) throws InterruptedException {

if (jobData.getLastRetrievedTime().isBefore(Instant.now().minusMillis(maxDataInterval))) {
jobData.setLastRetrievedTime(Instant.now().minusMillis(maxDataInterval));
}

List<E> newData = this.getData(jobData);

Expand All @@ -20,8 +30,8 @@ public void getAndProcessNewData(JobData<E> jobData) throws InterruptedException
if (newData != null && !newData.isEmpty()) {
jobData.setLastRetrievedTime(newData.get(newData.size() - 1).getCaptureTs());

for (int i = newData.size() - 1; i >= 0; i--) {
success = jobData.getInputData().offer(newData.get(i));
for (E dataPoint : newData) {
success = jobData.getInputData().offer(dataPoint);
if (!success) {
discardCount++;
}
Expand All @@ -32,8 +42,6 @@ public void getAndProcessNewData(JobData<E> jobData) throws InterruptedException
}

this.process(jobData);

// Pass data to database output / writer
}

abstract List<E> getData(JobData<E> jobData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

Expand All @@ -18,9 +17,6 @@ public class AnalyticsJobCreator {

Logger log = LoggerFactory.getLogger(this.getClass());

@Value("${analytics.dataRetrievalRate:2000}")
private int dataRetrievalRate;

private List<JobData> jobsToRun = null;

@Autowired
Expand All @@ -32,7 +28,7 @@ public class AnalyticsJobCreator {
@Autowired
private LineCrossingJob lineCrossingJob;

@Scheduled(initialDelay = 0, fixedRate = 1000000)
@Scheduled(initialDelay = 0, fixedRate = 10000)
private void refreshJobs() {
log.info("in refreshJobs");
jobsToRun = new ArrayList<>();
Expand All @@ -42,27 +38,26 @@ private void refreshJobs() {
}
}

@Scheduled(initialDelay = 1000, fixedRate = 10000)
@Scheduled(initialDelay = 1000, fixedRateString = "${analytics.jobRunInterval:10000}")
private void runJobs() {
log.info("in runJobs");
try {
if (jobsToRun != null && !jobsToRun.isEmpty()) {
for (JobData job : jobsToRun) {
if (jobsToRun != null && !jobsToRun.isEmpty()) {
for (JobData job : jobsToRun) {
try {
log.debug("Running job: {}", job.getConfig().getName());
switch (job.getConfig().getType()) {
case LINE_CROSSING:
lineCrossingJob.getAndProcessNewData(job);
lineCrossingJob.run(job);
break;
case AREA_OCCUPANCY:
areaOccupancyJob.getAndProcessNewData(job);
areaOccupancyJob.run(job);
break;
default:
break;
}
log.debug("Running job: {}", job.getConfig().getName());
} catch (Exception e) {
log.error("Exception during job run {}", job.getConfig().getName(), e);
}
}
} catch (Exception e) {
log.error("Exception during running the job", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package de.starwit.service.jobs;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -21,17 +21,17 @@ public class AreaOccupancyJob extends AbstractJob<SaeCountEntity> {

@Override
List<SaeCountEntity> getData(JobData<SaeCountEntity> jobData) {
return saeRepository.getDetectionData(jobData.getLastRetrievedTime(),
return saeRepository.getCountData(jobData.getLastRetrievedTime(),
jobData.getConfig().getCameraId(),
jobData.getConfig().getDetectionClassId());
}

@Override
void process(JobData<SaeCountEntity> jobData) throws InterruptedException {
if (jobData != null) {
ArrayBlockingQueue<SaeCountEntity> queue = jobData.getInputData();
Queue<SaeCountEntity> queue = jobData.getInputData();
while (queue != null && !queue.isEmpty()) {
areaOccupancyService.addEntry(queue.take());
areaOccupancyService.addEntry(queue.poll());
}
}
}
Expand Down
11 changes: 4 additions & 7 deletions service/src/main/java/de/starwit/service/jobs/JobData.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,28 @@
package de.starwit.service.jobs;

import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;

import de.starwit.persistence.common.entity.AbstractCaptureEntity;
import de.starwit.persistence.databackend.entity.AnalyticsJobEntity;

public class JobData<E extends AbstractCaptureEntity> {

private ArrayBlockingQueue<E> inputData;
private Queue<E> inputData;
private final AnalyticsJobEntity config;
private Instant lastRetrievedTime;

JobData(AnalyticsJobEntity config) {
this.inputData = new ArrayBlockingQueue<>(100);
this.inputData = new ArrayBlockingQueue<>(500);
this.config = config;
this.lastRetrievedTime = Instant.now();
}

public ArrayBlockingQueue<E> getInputData() {
public Queue<E> getInputData() {
return inputData;
}

public void setInputData(ArrayBlockingQueue<E> inputData) {
this.inputData = inputData;
}

public AnalyticsJobEntity getConfig() {
return config;
}
Expand Down
Loading
Loading