Skip to content

Commit

Permalink
Merge branch 'main' into synchrophasor
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardalee committed Aug 13, 2024
2 parents 2884420 + 8e8a783 commit ad2bcfe
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 0 deletions.
34 changes: 34 additions & 0 deletions examples/Python/src/Audio/Spectrum.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
target Python {
keepalive: true # Needed because of the physical action in the Microphone reactor.
}

import Microphone from "../lib/Audio.lf"
import VectorPlot from "../lib/Plotters.lf"

preamble {=
import numpy as np
=}

reactor FFT {
input x
output y

reaction(x) -> y {=
s = np.fft.rfft(x.value)
y.set(np.abs(s))
=}
}

main reactor {
m = new Microphone(block_size=1024)
f = new FFT()
p = new VectorPlot(
size = [10, 5],
title="Spectrum",
xlabel = "Frequency (Hz)",
ylabel="Magnitude",
ylim = [0, 10],
xrange = {= [0, 16000/2 + 1, 16000/1024] =})
m.audio_data -> f.x
f.y -> p.y
}
72 changes: 72 additions & 0 deletions examples/Python/src/lib/Audio.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* @file
* @author Vincenzo Barbuto
* @author Edward A. Lee
* @brief Basic audio library for Lingua Franca Python target.
*/
target Python {
keepalive: true
}

/**
* @brief A reactor that produces audio data captured from a microphone.
*
* This reactor outputs a series of numpy arrays of audio samples. The size of each array is equal
* to the `block_size` parameter. Each array element will be a `float32` value if the number of
* channels is 1, or an array of `float32` values if the number of channels is greater than 1.
*
* The logical time of the output is the physical time at which the audio hardware calls a callback
* function, which occurs when a buffer of length block_size has been filled with audio samples.
*
* If your machine has more than one microphone, you can select one by specifying the device index.
* To see what devices are available, run `python3 -m sounddevice`.
*
* To use this reactor, you must install the sounddevice and numpy libraries for Python. You can do
* this with `pip install sounddevice numpy`.
*/
reactor Microphone(block_size=16000, sample_rate=16000, channels=1, device = {= None =}) {
physical action send_audio_data
output audio_data

preamble {=
import sounddevice as sd
import numpy as np
=}

reaction(startup) -> send_audio_data {=
def callback(indata, frames, time, status):
if status:
print(status)
input_data = indata.astype(self.np.float32)
if (len(input_data)):
if self.channels == 1:
input_data = self.np.array(self.np.transpose(input_data)[0])
send_audio_data.schedule(0, input_data)

self.stream = self.sd.InputStream(
channels=self.channels,
samplerate=self.sample_rate,
callback=callback,
blocksize=self.block_size,
device=self.device)
self.stream.start()
=}

reaction(send_audio_data) -> audio_data {=
audio_data.set(send_audio_data.value)
=}

reaction(shutdown) {=
if self.stream:
self.stream.close()
=}
}

/** @brief A test reactor that prints arrays of audio data captured from a microphone. */
main reactor {
m = new Microphone()

reaction(m.audio_data) {=
print(m.audio_data.value, " at time ", lf.time.logical_elapsed())
=}
}
96 changes: 96 additions & 0 deletions examples/Python/src/lib/Plotters.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* @file
* @author Edward A. Lee
* @brief Reactors for plotting signals.
*/
target Python {
timeout: 10 s
}

/**
* @brief A reactor that plots a sequence of vectors, where each vector plot replaces the previous
* one.
*
* The `size` parameter is a tuple of two values: the width and height of the plot in inches.
*
* The `title` parameter is a string that is displayed above the plot. The `xlabel` and `ylabel`
* parameters provide strings to label the x and y axes, respectively.
*
* The `ylim` parameter is a tuple of two values: the lower limit and the upper limit of the y axis.
* If no ylim is specified or it is not a tuple with two value, then the default range is determined
* by the limits of the first vector provided.
*
* The `xrange` parameter is a tuple of three values: the start of the x axis, the end of the x
* axis, and the step size. If `xrange` is `None` (the default), the x axis will be set to the
* length of the first input vector.
*
* To use this reactor, you must install the matplotlib and numpy libraries for Python. You can do
* this with `pip install matplotlib numpy`. See [matplotlib
* documentation](https://matplotlib.org/stable/) for more information.
*/
reactor VectorPlot(
size = {= None =},
title = {= None =},
xlabel = {= None =},
xrange = {= None =},
ylabel = {= None =},
ylim = {= None =}) {
preamble {=
import matplotlib.pyplot as plt
import numpy as np
=}

input y
state showing = False
state figure = {= None =}
state axes = {= None =}
state line1 = {= None =}

reaction(y) {=
if not self.showing:
# First vector to plot. Set up the plot.
# to run GUI event loop
self.plt.ion()
self.figure, self.axes = self.plt.subplots(figsize=self.size)
if (self.title):
self.axes.set_title(self.title)
if (self.xlabel):
self.axes.set_xlabel(self.xlabel)
if (self.ylabel):
self.axes.set_ylabel(self.ylabel)
if (self.ylim) and len(self.ylim) == 2:
self.axes.set_ylim(self.ylim[0], self.ylim[1])

# Determine the x axis.
xrange = self.xrange
if not xrange:
xrange = [0, len(y.value), 1]
x = self.np.arange(xrange[0], xrange[1], xrange[2])

# Plot the data.
self.line1, = self.axes.plot(x, y.value)

self.showing = True
else:
self.line1.set_ydata(y.value)

self.figure.canvas.draw()
self.figure.canvas.flush_events()
=}
}

main reactor {
preamble {=
import numpy as np
=}
state count = 1
timer t(0, 1 s)
v = new VectorPlot(title = "Sine wave", xlabel = "X Axis", ylabel = "Y Axis")

reaction(t) -> v.y {=
x = self.np.linspace(0, 2 * self.np.pi * self.count, 200)
y = self.np.sin(x)
v.y.set(y)
self.count += 1
=}
}

0 comments on commit ad2bcfe

Please sign in to comment.