- General Types
- Stream Definition
- Table Definition
- Window Definition
- Trigger Definition
- Aggregation Definition
- Function Definition
- Source & Sink Definition
- Query Definition
- Partition Definition
These are general structures that are used in most of the siddhi element definitions, they are not stand alone elements (that is why they do not have an 'id').
This is used to define a map of key-value pair and is named as {Key-Value Pair JSON}
when the structure
is referred.
{
`key`: `value`,
...
}
Define's attributes of a Stream, Table etc.. that have the same format. So they all use the same structure
which is named attributeList
where ever they are used in a element definition.
[
{
name*: ‘’,
type*: ‘STRING|INT|LONG|DOUBLE|FLOAT|BOOL|OBJECT’
},
...
]
NOTE - The JSON attributes that end with a *
symbol means that the attribute cannot be left null/empty
Example: To define the attributes (name string, age int)
the JSON structure would look like this,
[
{
name: 'name',
type: 'STRING'
},
{
name: 'age',
type: 'INT'
}
]
Defines the annotations of any Siddhi element, as all annotations have somewhat the same structure.
They are defined in as a part of a Siddhi element using the name annotationList
,
and have the following JSON structure.
["annotation1", "annotation2", ...]
Example: To define the annotations @Async(buffer.size='1024') @PrimaryKey('name','age')
the JSON
structure would look like this,
["@Async(buffer.size='1024')", "@PrimaryKey('name', 'age')"]
Note - Sources, Sinks & Stores do not come under the general annotation struct as they have a different structure and are shown separately
The store annotation is only used for tables and aggregations. So they are only defined in both table and
aggregation JSON structs as the name store
in the following format:
{
type*: ‘’,
options*: {Key-Value Pair JSON}
}
Example:
@Store(type="rdbms",
jdbc.url="jdbc:mysql://localhost:3306/production",
username="wso2",
password="123" ,
jdbc.driver.name="com.mysql.jdbc.Driver")
The JSON for the above store definition is,
{
type: 'rdbms',
options: {
'jdbc.url': 'jdbc:mysql://localhost:3306/production',
'username': 'wso2',
'password': '123',
'jdbc.driver.name': 'com.mysql.jdbc.Driver'
}
}
Streams in queries can have multiple filters and functions, but only one window.
- If the query is of type
window-filter-projection
, then it's window can be placed anywhere in the input stream element section. - If the query is of type
join
, then it's last element in the input stream element must end with a Window (i.e. If the input stream element has atleast one function or filter). - If the query is of type
pattern & sequence
, then it's input stream element cannot have any window at all.
The JSON structure (denoted as streamHandlerList
in other JSON structures) for an
input stream element Filter/Function/Window in a query is given below,
[
{
type*: 'FILTER',
value*: ''
},
<< and|or >>
{
type*: 'FUNCTION|WINDOW',
value*: {
function*: '',
parameters: ['value1',...],
}
},
...
]
Example: Consider the following streamHandlerList
of a plain window-filter-projection
query
from Instream[name == 'Mark']#window.time(10 min)#str:tokenize('<Test>', '<Test Regex>')
select ...
The JSON for the above example would look like this,
[
{
type: 'FILTER',
value: 'name == \'Mark\''
},
{
type: 'WINDOW',
value: {
function: 'time',
parameters: ['10 min']
}
},
{
type: 'FUNCTION',
value: {
function: 'str:tokenize',
parameters: ['<Test>', '<Test Regex>']
}
}
]
{
id*: '',
name*: '',
attributeList*: {Attributes JSON Array},
annotationList: {Annotations JSON Array}
}
Example:
@Async(buffer.size="1024")
define stream InStream (name string, age int);
The JSON for the above stream definition is,
{
id: 'InStream',
name: 'InStream',
isInnerStream: false,
attributeList: [
{
name: 'name',
type: 'STRING'
},
{
name: 'age',
type: 'INT'
}
],
annotationList: ["@Async(buffer.size='1024')"]
}
Note that if a stream is an inner stream, then it's attributes cannot be defined and it cannot have any annotations. Have to handle this in another way.
{
id*: ‘’,
name*: ‘’,
attributeList*: {Attributes JSON Array},
store: {Store JSON},
annotationList: {Annotations JSON Array}
}
Example:
define table InTable (name string, age int);
The JSON for the above table definition is,
{
id: 'InTable',
name: 'InTable',
attributeList: [
{
name: 'name',
type: 'STRING'
},
{
name: 'age',
type: 'INT'
}
],
store: {},
annotationList: []
}
{
id*: ‘’,
name*: ‘’,
attributeList*: {Attributes JSON Array},
function*: ‘time|length|timeBatch|lengthBatch...’,
parameters*: ['value1',...],
outputEventType: ‘current_events|expired_events|all_events’,
annotationList: {Annotations JSON Array}
}
Example:
define window SensorWindow (name string, value float) timeBatch(1 second) output expired events;
The JSON for the above window definition is,
{
id: 'SensorWindow',
name: 'SensorWindow',
attributeList: [
{
name: 'name',
type: 'string'
},
{
name: 'value',
type: 'float'
}
],
function: 'timeBatch',
parameters: ['1 second'],
outputEventType: 'expired',
annotationsList: []
}
{
id*: ‘’,
name*: ‘’,
at*: ‘’
annotationList: {Annotations JSON Array}
}
Example:
define trigger FiveMinTrigger at every 5 min;
The JSON for the above trigger definition is,
{
id: 'FiveMinTrigger',
name: 'FiveMinTrigger',
at: 'every 5 min',
annotationList: []
}
{
id*: ‘’,
name*: ‘’,
from*: ‘’,
select*: {Query Select JSON},
groupBy: ['value1',...],
aggregateByAttribute: '',
aggregateByTimePeriod*: {
type*: 'RANGE',
value*: {
min*: '',
max*: ''
}
<< or >>
type*: 'INTERVAL',
value*: ['sec', 'min', ...] // Atleast one value must be available
},
store: {Store JSON},
annotationList: {Annotations JSON Array}
}
Example:
define aggregation TradeAggregation
from TradeStream
select symbol, avg(price) as avgPrice, sum(price) as total
group by symbol
aggregate by timestamp every sec...year;
The JSON for the above aggregation definition is,
{
id: 'TradeAggregation',
name: 'TradeAggregation',
from: 'TradeStream',
select: {
type: 'user_defined',
value: [
{
expression: 'symbol',
as: ''
},
{
expression: 'avg(price)',
as: 'avgPrice'
},
{
expression: 'sum(price)',
as: 'total'
}
]
},
groupBy: ['symbol'],
aggregateByAttribute: 'timestamp',
aggregateByTimePeriod: {
type: 'RANGE',
value: {
min: 'sec',
max: 'year'
}
},
store: {},
annotationList: []
}
{
id*: '',
name*: '',
scriptType*: 'JAVASCRIPT|R|SCALA',
returnType*: 'STRING|INT|LONG|DOUBLE|FLOAT|BOOL|OBJECT',
body*: ''
}
Example:
define function concatFn[javascript] return string {
var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var responce = str1 + str2 + str3;
return responce;
};
The JSON for the above function definition is,
{
id: '',
name: 'concatFn',
scriptType: 'JAVASCRIPT',
returnType: 'STRING',
body: 'var str1 = data[0];\nvar str2 = data[1];\nvar str3 = data[2];\nvar responce = str1 + str2 + str3;\nreturn responce;'
}
{
id*: ‘’,
annotationType*: 'SINK|SOURCE',
connectedElementName*: '',
type*: ‘’,
options: ['option1', 'option2=value2',...],
map: {
type*: ‘’,
options: {Key-Value Pair JSON},
payloadOrAttribute: {
annotationType: 'PAYLOAD|ATTRIBUTES',
type*: ‘MAP’,
value*: {Key-Value Pair JSON}
}
<< or >>
payloadOrAttribute: {
annotationType: 'PAYLOAD|ATTRIBUTES',
type*: ‘LIST’,
value*: ['value1',...]
}
}
}
IMPORTANT
- Map attributes for sources are either key-value pair JSON or list (no single), but the map attributes for sinks are either key-value pair JSON or single (no list).
- If the
annotationType
is SOURCE then the map attributes are denoted using@attributes(...)
, but if the definitionType is SINK then the map attributes are defined as@payload(...)
in Siddhi.
Example 1 (Source):
@Source(
type = 'http',
receiver.url='http://localhost:8006/productionStream',
basic.auth.enabled='false',
@map(type='json')
)
The JSON for the above source definition is,
{
id: '<UUID>',
annotationType: 'SOURCE',
connectedElementName: '<StreamName||TriggerName>',
type: 'http',
options: ["receiver.url = 'http://localhost:8006/productionStream'", "basic.auth.enabled = 'false'"],
map: {
type: 'json',
options: [],
attributes: []
},
}
Example 2 (Sink):
@sink(
type='http',
publisher.url='http://localhost:8005/endpoint',
method='POST',
headers='Accept-Date:20/02/2017',
basic.auth.username='admin',
basic.auth.password='admin',
basic.auth.enabled='true',
@map(type='json')
)
The JSON for the above sink definition is,
{
id: '<UUID>',
annotationType: 'SINK',
connectedElementName: '<StreamName||TriggerName>',
type: 'http',
options: ["publisher.url = 'http://localhost:8005/endpoint'", "method = 'POST'", "headers = 'Accept-Date:20/02/2017'", "basic.auth.username = 'admin'", "basic.auth.password = 'admin'", "basic.auth.enabled = 'true'"],
map: {
type: 'json',
options: [],
attributes: []
}
}
All queries have the following JSON body structure
{
id*: '',
queryInput*: {Query Input JSON},
select*: {Query Select JSON},
groupBy: ['value1',...],
orderBy: [
{
value*: '',
order: 'asc|desc'
},
...
],
limit: <long>,
having: '',
outputRateLimit: ''
queryOutput*: {Query Output JSON},
annotationList: {Annotation JSON Array}
}
The query input can be of the following types:
- Window-Filter-Projection
- Join
- Pattern & Sequence
Note that for this type there are a few conditions for each type even though they have the same JSON structure:
- If the
type
iswindow
, then the query must have a window and can have an optional filter. - If the
type
isfilter
, then the query must have a filter and no window. - If the
type
isprojection
, then the query cannot have a filter or window.
{
type*: 'window|filter|projection',
from*: '',
streamHandlerList: {Stream Handler JSON}
}
Example:
from InputStream[age >= 18]#window.time(1 hour)[age < 30]
select ...
The JSON for the above Window-Filter-Projection
input is,
{
type: 'window',
from: 'InputStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age >= 18'
},
{
type: 'WINDOW',
value: {
function: 'time',
parameters: ['1 hour']
}
},
{
type: 'FILTER',
value: 'age < 30'
}
]
}
Please note that even triggers can be used in window-filter-projection queries as they are treated as streams underneath in the siddhi runtime
A join
query can be one of 4 types:
- Join Stream
- Join Table
- Join Aggregation
- Join Window
- Join Trigger
The way to identify a join query type is by using the joinWith
attribute.
However all 4 of these join types will be defined using the same JSON structure.
{
type*: 'join',
joinWith*: 'stream|table|window|aggregation|trigger',
left*: {Join Element JSON},
joinType*: 'join|left_outer|right_outer|full_outer',
right*: {Join Element JSON},
on: '',
within: '', // If joinWith == aggregation
per: '' // If joinWith == aggregation
}
The Join Element JSON
has the following structure:
{
type*: 'stream|table|window|aggregation|trigger',
from*: '',
streamHandlerList: {Stream Handler JSON} // If there is a filter, there must be a window for joins (the only exception is when type = window).
as: '',
isUnidirectional: true|false // Only one 'isUnidirectional' value can be true at a time (either left definition|right definition|none)
}
There are a few conditions that must be met for a join query input to be a valid one:
- At least one,
left
orright
JSON value must be of stream or trigger type, or else it is not a valid join query input. - If a
Join element JSON
is of typewindow
, then that element's window attribute must be null. This is because a window definition cannot have another window within it. - If there is a
Join Element JSON
of typeaggregation
, then thewithin
andper
attributes in the JSON structure cannot be null. If there is no aggregation definition, then those attributes have to be null. - If a
Join Element JSON
has afilter
, then it must have a window as well or else it is invalid (except if that element is of typewindow
definition). - Only one
Join Element JSON
can be marked asisUnderectional: true
. It is either the left definition, right definition or neither of them.
Example:
from StockStream as S join TradeAggregation as T
on S.symbol == T.symbol
within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30"
per "days"
select ...
The JSON for the above Join Aggregation
input is,
{
type: 'join',
joinWith: 'aggregation',
left: {
type: 'stream',
from: 'StockStream',
streamHandlerList: [],
as: 'S',
isUnidirectional: false
},
joinType: 'join',
right: {
type: 'aggregation',
from: 'TradeAggregation',
streamHandlerList: [],
as: 'T',
isUnidirectional: false
},
on: 'S.symbol == T.symbol',
within: '\"2014-02-15 00:00:00 +05:30\", \"2014-03-16 00:00:00 +05:30\"',
per: '\"days\"'
}
The JSON structure for both patterns & sequences are identical:
{
type*: 'pattern|sequence',
conditionList*: [
{
conditionId*: '',
streamName*: '',
streamHandlerList: {Stream Handler JSON}
},
...
],
logic*: ''
}
Example
from every event1=InStream[age < 100]<21:234> within 10 min ->
every event2=InStream[age > 30] and event3=InStream[age < 50]->
every not InStream[age >= 18] for 5 sec ->
every not InStream[age < 18] and event6=InStream[age > 30]
select ...
The above pattern input is defined by the following JSON structure:
{
type: 'pattern',
conditionList: [
{
conditionId: 'event1',
streamName: 'InStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age < 100'
}
]
},
{
conditionId: 'event2',
streamName: 'InStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age > 30'
}
]
},
{
conditionId: 'event3',
streamName: 'InStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age < 50'
}
]
},
{
conditionId: 'event4',
streamName: 'InStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age >= 18'
}
]
},
{
conditionId: 'event5',
streamName: 'InStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age < 18'
}
]
},
{
conditionId: 'event6',
streamName: 'InStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age > 30'
}
]
}
],
logic: 'every event1<21:234> within 10 min -> every event2 and event3 -> every not event4 for 5 sec -> every not event5 and event6'
}
Note - If their is a not
statement before a conditionId
in the logic
attribute, then that conditionId
will not be displayed in the source view.
For an example:
{
type: 'sequence',
conditionList: [
{
conditionId: 'e1',
streamName*: 'InStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age >= 18'
}
]
},
{
conditionId: 'e2',
streamName*: 'InStream',
streamHandlerList: [
{
type: 'FILTER',
value: 'age < 30'
}
]
}
],
logic: 'every e1+ within 10 min, not e2 for 10 min'
}
The siddhi code for the above JSON would look like this:
from
every e1=InStream[age >= 18],
not InStream[age < 30] for 10 min
select ...
Notice that the second sequence not InStream[age < 30] for 10 min
does not have the phrase
"e2="
reference like e1
does because it has a not
statement before it.
{
type*: 'user_defined',
value*: [
{
expression*: '',
as: ''
},
...
]
<< or >>
type*: 'all',
value*: '*'
}
Example:
select Stream1.id as UID, avg(price) as avgPrice, ((TempStream.temp - 32) * 5)/9 as Celsius, isInStock ...
The JSON for the above select
function is,
{
type: 'user_defined',
value: [
{
expression: 'Stream1.id',
as: 'UID'
},
{
expression: 'avg(price)',
as: 'avgPrice'
},
{
expression: '((TempStream.temp - 32) * 5)/9',
as: 'Celsius'
},
{
expression: 'isInStock',
as: ''
}
]
}
All query outputs have the following generic structure:
{
type*: 'insert|delete|update|update-or-insert-into',
output*: {INSERT JSON|DELETE JSON|UPDATE JSON|UPDATE-OR-INSERT JSON},
target*: ''
}
The output
Attribute Can Be Of 4 JSON Structures:
- Insert
- Delete
- Update
- Update or insert into
{
eventType: 'current_events|expired_events|all_events'
}
Example:
insert all events into LogStream;
The JSON for the above insert
function is,
{
type: 'insert',
output: {
eventType: 'all'
},
target: 'LogStream'
}
{
eventType: 'current_events|expired_events|all_events',
on*: ''
}
Example:
delete RoomTypeTable
for all events
on RoomTypeTable.roomNo == roomNumber;
The JSON for the above insert
function is,
{
type: 'delete',
output: {
eventType: 'all',
on: 'RoomTypeTable.roomNo == roomNumber'
},
target: 'RoomTypeTable',
}
{
eventType: 'current_events|expired_events|all_events',
set: [
{
attribute*: '',
value*: ''
},
...
],
on*: ''
}
Example:
update RoomTypeTable
set RoomTypeTable.people = RoomTypeTable.people + arrival - exit
on RoomTypeTable.roomNo == roomNumber;
The JSON for the above update
function is,
{
type: 'update',
output: {
eventType: '',
set: [
{
attribute: 'RoomTypeTable.people',
value: 'RoomTypeTable.people + arrival - exit'
}
],
on: 'RoomTypeTable.roomNo == roomNumber'
},
target: 'RoomTypeTable',
}
{
eventType: 'current_events|expired_events|all_events',
set: [
{
attribute*: '',
value*: ''
},
...
],
on*: ''
}
Example:
update or insert into RoomAssigneeTable
set RoomAssigneeTable.assignee = assignee
on RoomAssigneeTable.roomNo == roomNo;
The JSON for the above update or insert into
function is,
{
type: 'update_or_insert_into',
output: {
eventType: '',
set: [
{
attribute: 'RoomAssigneeTable.assignee',
value: 'assignee'
}
],
on: 'RoomAssigneeTable.roomNo == roomNo'
},
target: 'RoomAssigneeTable',
}
{
id*: ‘’,
queryLists: [
{
'<queryType>': [{Query JSON},...]
},
...
],
innerStreamList: [{Stream Definition JSON},...],
partitionWith*: [
{
streamName*: '',
expression*: ''
},
..
],
annotationList: {Annotation JSON Array}
}
Example:
@info(name='TestPartition')
partition with ( roomNo >= 1030 as 'serverRoom' or
roomNo < 1030 and roomNo >= 330 as 'officeRoom' or
roomNo < 330 as 'lobby' of TempStream)
begin
@info(name='TestQuery1')
from TempStream
select *
insert into #OutputStream;
@info(name='TestQuery2')
from TempStream#window.time(10 min)
select roomNo, deviceID, avg(temp) as avgTemp
insert into AreaTempStream;
end;
The JSON for the above partition
is,
{
id: ‘TestPartition’,
queryLists: [
{
'<queryType>': [{Query JSON},...]
},
...
],
streamList: [{#OutputStream JSON}],
partitionWith: [
{
expression: 'roomNo >= 1030 as \'serverRoom\' or roomNo < 1030 and roomNo >= 330 as \'officeRoom\' or roomNo < 330 as \'lobby\''
streamName: 'TempStream',
}
],
annotationList: ['@info(name=\'TestPartition\')']
}