Skip to content

Commit

Permalink
Add Luftdaten.info feeder
Browse files Browse the repository at this point in the history
  • Loading branch information
3cky committed Jan 22, 2020
1 parent c153e09 commit 9cf9790
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 21 deletions.
94 changes: 83 additions & 11 deletions feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package main

import (
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"time"

Expand All @@ -39,37 +41,37 @@ func NewOpenAirFeeder(apiServerUrl string, measurementsKeepDuration time.Duratio
}
}

func (oae *OpenAirFeeder) Feed(data *StationData) {
func (oaf *OpenAirFeeder) Feed(data *StationData) {
// Delete expired buffered measurements
now := time.Now()
for {
if len(oae.measurements) == 0 {
if len(oaf.measurements) == 0 {
break
}

// Stop on first unexpired buffered measurement
t := oae.measurements[0].Timestamp
if t != nil && now.Sub(time.Time(*t)) < oae.measurementsKeepDuration {
t := oaf.measurements[0].Timestamp
if t != nil && now.Sub(time.Time(*t)) < oaf.measurementsKeepDuration {
break
}

// Remove expired buffered measurement
oae.measurements = oae.measurements[1:]
oaf.measurements = oaf.measurements[1:]
}

// Add last data measurement to buffered measurements
oae.measurements = append(oae.measurements, *data.LastMeasurement)
oaf.measurements = append(oaf.measurements, *data.LastMeasurement)

f := api.FeederData{
TokenId: data.TokenId,
Version: data.Version,
Measurements: oae.measurements,
Measurements: oaf.measurements,
}

log.Debugf("[OpenAir] posting %d measurement(s) to %s", len(oae.measurements), oae.apiServerUrl)
log.Debugf("[OpenAir] posting %d measurement(s) to %s", len(oaf.measurements), oaf.apiServerUrl)

var r api.Result
if err := HttpPostData(oae.apiServerUrl, f, &r); err != nil {
if err := HttpPostData(oaf.apiServerUrl, nil, f, &r); err != nil {
log.Errorf("[OpenAir] data posting failed: %v", err)
return
}
Expand All @@ -78,8 +80,78 @@ func (oae *OpenAirFeeder) Feed(data *StationData) {
return
}

log.Debugf("[OpenAir] successfully posted %d measurement(s) to %s", len(oae.measurements), oae.apiServerUrl)
log.Debugf("[OpenAir] successfully posted %d measurement(s) to %s", len(oaf.measurements), oaf.apiServerUrl)

// Delete successfully posted buffered measurements
oae.measurements = nil
oaf.measurements = nil
}

type LuftdatenFeeder struct {
apiServerUrl string
sensorDataPostInterval time.Duration

lastSensorDataPostTime time.Time
}

type LuftdatenSensorData struct {
SoftwareVersion string `json:"software_version"`
SensorDataValues []LuftdatenSensorDataValue `json:"sensordatavalues"`
}
type LuftdatenSensorDataValue struct {
ValueType string `json:"value_type"`
Value float32 `json:"value"`
}

func NewLuftdatenFeeder() *LuftdatenFeeder {
return &LuftdatenFeeder{
apiServerUrl: "https://api.luftdaten.info/v1/push-sensor-data/",
sensorDataPostInterval: 3 * time.Minute,
}
}

func (lf *LuftdatenFeeder) Feed(data *StationData) {
sensorId := fmt.Sprintf("raspi-%s", data.TokenId[:12])

if time.Since(lf.lastSensorDataPostTime) < lf.sensorDataPostInterval {
log.Debugf("[Luftdaten] %s: skip sensor data posting", sensorId)
return
}

pmSensorData := &LuftdatenSensorData{
SoftwareVersion: data.Version,
SensorDataValues: []LuftdatenSensorDataValue{
{ValueType: "P1", Value: Float32RefRound(data.LastMeasurement.Pm10, 1)},
{ValueType: "P2", Value: Float32RefRound(data.LastMeasurement.Pm25, 1)},
},
}
lf.postSensorData(sensorId, 1, pmSensorData)

envSensorData := &LuftdatenSensorData{
SoftwareVersion: data.Version,
SensorDataValues: []LuftdatenSensorDataValue{
{ValueType: "temperature", Value: Float32RefRound(data.LastMeasurement.Temperature, 1)},
{ValueType: "humidity", Value: Float32RefRound(data.LastMeasurement.Humidity, 1)},
{ValueType: "pressure", Value: 100 * Float32RefRound(data.LastMeasurement.Pressure, 2)},
},
}
lf.postSensorData(sensorId, 11, envSensorData)

lf.lastSensorDataPostTime = time.Now()
}

func (lf *LuftdatenFeeder) postSensorData(sensorId string, sensorPin int, sensorData *LuftdatenSensorData) {
log.Debugf("[Luftdaten] %s: posting sensor [%d] data to %s", sensorId, sensorPin, lf.apiServerUrl)

headers := map[string]interface{}{
"X-Sensor": sensorId,
"X-Pin": sensorPin,
}

var r map[string]*json.RawMessage
if err := HttpPostData(lf.apiServerUrl, headers, sensorData, &r); err != nil {
log.Errorf("[Luftdaten] %s: sensor [%d] data posting failed: %v", sensorId, sensorPin, err)
return
}

log.Debugf("[Luftdaten] %s: successfully posted sensor [%d] data", sensorId, sensorPin)
}
27 changes: 23 additions & 4 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,42 @@ func HttpGetData(url string, res interface{}) error {
}

if r.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP error %d, response:\n%s", r.StatusCode, b)
return fmt.Errorf("%d: %s", r.StatusCode, b)
}

return json.Unmarshal(b, &res)
}

func HttpPostData(url string, data, res interface{}) error {
func HttpPostData(url string, headers map[string]interface{}, data, res interface{}) error {
jd, err := json.Marshal(data)
if err != nil {
return err
}

r, err := httpClient.Post(url, "application/json", bytes.NewBuffer(jd))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jd))
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
for k, v := range headers {
req.Header.Add(k, fmt.Sprintf("%v", v))
}

r, err := httpClient.Do(req)
if err != nil {
return err
}
defer CloseQuietly(r.Body)

return json.NewDecoder(r.Body).Decode(res)
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}

if r.StatusCode < http.StatusOK || r.StatusCode > http.StatusIMUsed {
return fmt.Errorf("%d: %s", r.StatusCode, b)
}

return json.Unmarshal(b, &res)
}
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func main() {

apiServerUrl := flag.String("a", "https://api.openair.city/v1/feeder", "feeder endpoint address")

updatePeriod := flag.Duration("t", 1*time.Minute, "data update period")
updateInterval := flag.Duration("t", 1*time.Minute, "data update interval")

keepDuration := flag.Duration("k", 6*time.Hour, "buffered data keep duration")

Expand Down Expand Up @@ -110,9 +110,10 @@ func main() {

feeders := []Feeder{
NewOpenAirFeeder(*apiServerUrl, *keepDuration),
NewLuftdatenFeeder(),
}

RunStation(ctx, station, feeders, *updatePeriod, *settleTime, *disablePmCorrectionFlag)
RunStation(ctx, station, feeders, *updateInterval, *settleTime, *disablePmCorrectionFlag)

log.Printf("exiting...")
}
4 changes: 2 additions & 2 deletions station.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (rs *RpiStation) GetData() (*StationData, error) {
}, nil
}

func RunStation(ctx context.Context, station Station, feeders []Feeder, updatePeriod time.Duration,
func RunStation(ctx context.Context, station Station, feeders []Feeder, updateInterval time.Duration,
settleTime time.Duration, disablePmCorrectionFlag bool) {
p := time.Duration(0)

Expand All @@ -305,7 +305,7 @@ func RunStation(ctx context.Context, station Station, feeders []Feeder, updatePe
for {
select {
case <-time.After(p):
p = updatePeriod
p = updateInterval

data, err := station.GetData()
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,22 @@ func Sha1(s string) string {
// Convert reference to float32 to its string representation
func Float32RefToString(r *float32) string {
if r == nil {
return "<none>"
return ""
}

return fmt.Sprintf("%.1f", *r)
}

// Round float32 x to given number of decimal places
// Round float32 to given number of decimal places
func Float32Round(x float32, places int) float32 {
pow := math.Pow(10, float64(places))
return float32(math.Round(pow*float64(x)) / pow)
}

// Round referenced float32 to given number of decimal places
func Float32RefRound(r *float32, places int) float32 {
if r == nil {
return 0
}
return Float32Round(*r, places)
}

0 comments on commit 9cf9790

Please sign in to comment.