Skip to content

Commit

Permalink
feat: Pass stored webhook to pipeline execution
Browse files Browse the repository at this point in the history
When passing a webhook URL to the pipeline execution request, the server
will request it after its completion.
This webhook is typically the openpath trips fetching. Thanks to this,
we will now be able to retrieve the trips as soon as they are produced.
  • Loading branch information
paultranvan committed Jan 19, 2024
1 parent 83ff75e commit d76a9e7
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 29 deletions.
52 changes: 47 additions & 5 deletions src/app/domain/geolocation/tracking/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ import BackgroundGeolocation from 'react-native-background-geolocation'

import Minilog from 'cozy-minilog'

import { uploadData } from '/app/domain/geolocation/tracking/upload'
import {
runOpenPathPipeline,
uploadData
} from '/app/domain/geolocation/tracking/upload'
import { StorageKeys, storeData, getData } from '/libs/localStore/storage'
import { Log } from '/app/domain/geolocation/helpers'
import { saveActivity } from '/app/domain/geolocation/tracking/tracking'
import {
clearAllData,
getFetchServiceWebHook,
getFlagFailUpload
} from '/app/domain/geolocation/tracking/storage'
import { t } from '/locales/i18n'
Expand All @@ -21,13 +25,14 @@ import {
WALKING_ACTIVITY,
LOW_CONFIDENCE_THRESHOLD
} from '/app/domain/geolocation/tracking/consts'

export { Log, getAllLogs, sendLogFile } from '/app/domain/geolocation/helpers'
export { getOrCreateId, updateId } from '/app/domain/geolocation/tracking/user'
export { uploadData } from '/app/domain/geolocation/tracking/upload'
export { GeolocationTrackingHeadlessTask } from '/app/domain/geolocation/tracking/headless'
export { storeFetchServiceWebHook } from '/app/domain/geolocation/tracking/storage'
import { GeolocationTrackingEmitter } from '/app/domain/geolocation/tracking/events'
import { TRIP_END } from '/app/domain/geolocation/tracking/consts'
import { getOrCreateId } from '/app/domain/geolocation/tracking/user'

export {
clearAllCozyGPSMemoryData,
Expand Down Expand Up @@ -102,7 +107,7 @@ export const stopTracking = async () => {
await BackgroundGeolocation.stop()
await storeData(StorageKeys.ShouldBeTrackingFlagStorageAdress, false)
Log('Turned off tracking, uploading...')
await uploadData({ force: true }) // Forced end, but if fails no current solution (won't retry until turned back on)
await startOpenPathUploadAndPipeline({ force: true }) // Forced end, but no retry if it fails
} else {
Log('Tracking already off')
}
Expand Down Expand Up @@ -188,8 +193,9 @@ export const handleMotionChange = async event => {
// Get the event timestamp to filter out locations tracked after this
const stationaryTs = event.location?.timestamp
Log('Auto uploading from stop')
await uploadData({ untilTs: stationaryTs })
await startOpenPathUploadAndPipeline({ untilTs: stationaryTs })
GeolocationTrackingEmitter.emit(TRIP_END)

// Disable elasticity to improve next point accuracy
disableElasticity()
}
Expand All @@ -201,11 +207,47 @@ export const handleConnectivityChange = async event => {
// Does not work with iOS emulator, event.connected is always false
if (event.connected && (await getFlagFailUpload())) {
Log('Auto uploading from reconnection and failed last attempt')
await uploadData()
await startOpenPathUploadAndPipeline()
GeolocationTrackingEmitter.emit(TRIP_END)
}
}

/**
* Upload data and run openpath pipeline
* This is the complete openpath process that should eventually
* produce new trips that will be retrieved by the coachco2 service
* after completion of the pipeline.
* A webhook is passed to the openpath server so the service it started
* as soon as the trips are ready to be fetched.
*
* @typedef {object} Params
* @property {number} untilTS - Until when the locations points should be fetched. Default is 0
* @property {boolean} force - Whether or not the upload should be forced
*
* @param {Params} - The method parmas
*/
export const startOpenPathUploadAndPipeline = async ({
untilTs = 0,
force = false
}) => {
try {
const user = await getOrCreateId()
// Upload data to openpath server
const uploadedCount = await uploadData(user, { untilTs, force })
if (uploadedCount >= 0) {
// Start openpath pipeline
const webhook = (await getFetchServiceWebHook()) || ''
const resp = await runOpenPathPipeline(user, webhook)
if (resp?.ok) {
Log('Pipeline successfully run')
}
}
} catch (err) {
Log('Failed openpath processing: ' + JSON.stringify(err))
}
return
}

// Register on activity change
BackgroundGeolocation.onActivityChange(async event => {
return handleActivityChange(event)
Expand Down
8 changes: 8 additions & 0 deletions src/app/domain/geolocation/tracking/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ export const getFlagFailUpload = async () => {
}
}

export const getFetchServiceWebHook = async () => {
return await getData(StorageKeys.ServiceWebhookURL)
}

export const storeFetchServiceWebHook = async webhookURL => {
return storeData(StorageKeys.ServiceWebhookURL, webhookURL)
}

export const getId = async () => {
return await getData(StorageKeys.IdStorageAdress)
}
Expand Down
24 changes: 16 additions & 8 deletions src/app/domain/geolocation/tracking/tracking.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
runOpenPathPipeline,
uploadUserCache
} from '/app/domain/geolocation/tracking/upload'
import { uploadUserCache } from '/app/domain/geolocation/tracking/upload'
import { getOrCreateUser } from '/app/domain/geolocation/tracking/user'
import { getTs, Log, parseISOString } from '/app/domain/geolocation/helpers'
import {
Expand Down Expand Up @@ -47,14 +44,25 @@ export const createDataBatch = (locations, nRun, maxBatchSize) => {
return batchLocations
}

// Future entry point of algorithm
// prepareTrackingData / extractTrackingDate
export const smartSend = async (locations, user, { force = false } = {}) => {
/**
* Upload local tracking data to the server
*
* @param {Array<Location>} locations - The location points to upload
* @param {string} user - The openpath user id
* @param {{boolean}} - Additional params
* @returns {number} The number of uploaded tracking data
*/
export const uploadTrackingData = async (
locations,
user,
{ force = false } = {}
) => {
await getOrCreateUser(user)

if (!locations || locations.length === 0) {
Log('No new locations')
await uploadWithNoNewPoints({ user, force })
return 0
} else {
Log('Found pending locations, uploading: ' + locations.length)
const lastUploadedPoint = await getLastPointUploaded()
Expand All @@ -77,7 +85,7 @@ export const smartSend = async (locations, user, { force = false } = {}) => {
}
}
Log('Uploaded last batch')
await runOpenPathPipeline(user)
return locations.length
}
}

Expand Down
28 changes: 13 additions & 15 deletions src/app/domain/geolocation/tracking/upload.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import BackgroundGeolocation from 'react-native-background-geolocation'

import { smartSend } from '/app/domain/geolocation/tracking/tracking'
import { getOrCreateId } from '/app/domain/geolocation/tracking/user'
import { uploadTrackingData } from '/app/domain/geolocation/tracking/tracking'
import { Log } from '/app/domain/geolocation/helpers'
import { storeFlagFailUpload } from '/app/domain/geolocation/tracking/storage'
import { utf8ByteSize } from '/app/domain/geolocation/tracking/utils'
Expand Down Expand Up @@ -48,11 +47,14 @@ export const uploadUserCache = async (content, user) => {
return { ok: true }
}

export const runOpenPathPipeline = async user => {
Log('Request to run pipeline')
export const runOpenPathPipeline = async (user, webhook = '') => {
Log(`Request to run pipeline with webhook : ${webhook}`)
const request = {
user: user
}
if (webhook) {
request.webhook = webhook
}
const body = JSON.stringify(request)
const response = await fetch(SERVER_URL + '/cozy/run/pipeline', {
method: 'POST',
Expand Down Expand Up @@ -86,10 +88,8 @@ const filterBadContent = contentToUpload => {
})
}

export const uploadData = async ({ untilTs = 0, force = false } = {}) => {
// WARNING: la valeur de retour (booleen) indique le succès, mais mal géré dans le retryOnFail (actuellement uniquement utilisé pour le bouton "Forcer l'upload" avecec force et pas de retry)

Log('Starting upload process' + (force ? ', forced' : ''))
export const uploadData = async (user, { untilTs = 0, force = false } = {}) => {
Log(`Starting upload process for user ${user} ${force ? 'forced' : ''}`)

try {
const locations = await BackgroundGeolocation.getLocations()
Expand All @@ -103,18 +103,16 @@ export const uploadData = async ({ untilTs = 0, force = false } = {}) => {
if (filteredLocations.length < locations.length) {
Log('Locations filtered: ' + filteredLocations.length - locations.length)
}

let user = await getOrCreateId()
Log('Using Id: ' + user)

try {
await smartSend(filteredLocations, user, { force })
const uploadedCount = await uploadTrackingData(filteredLocations, user, {
force
})
await storeFlagFailUpload(false)
return true
return uploadedCount
} catch (message) {
Log('Error trying to send data: ' + message)
await storeFlagFailUpload(true)
return false
return -1
}
} catch (error) {
throw new Error(error)
Expand Down
3 changes: 2 additions & 1 deletion src/libs/localStore/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ export enum StorageKeys {
LastStartTransitionTsKey = 'CozyGPSMemory.LastStartTransitionTsKey',
GeolocationTrackingConfig = 'CozyGPSMemory.TrackingConfig',
Activities = 'CozyGPSMemory.Activities',
FirstTimeserie = 'CozyGPSMemory.FirstTimeserie'
FirstTimeserie = 'CozyGPSMemory.FirstTimeserie',
ServiceWebhookURL = 'CozyGPSMemory.ServiceWebhookURL'
}

export type IconsCache = Record<string, { version: string; xml: string }>
Expand Down

0 comments on commit d76a9e7

Please sign in to comment.