Skip to content

Commit

Permalink
FFI v0.12.2 & RPC (#62)
Browse files Browse the repository at this point in the history
* build

* build

* ffi

* ffi

* rpc

* version

* remove

* perform

* handle

* fwd

* proto

* Changes

* rpc

* wip

* wip

* works

* working

* docstring

* remove some files

* gi

* fmt

* clean

* docs

* docs
  • Loading branch information
bcherry authored Nov 21, 2024
1 parent 47692c7 commit d90ee89
Show file tree
Hide file tree
Showing 33 changed files with 27,856 additions and 8,853 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ crashlytics-build.properties
/[Aa]ssets/[Ss]treamingAssets/aa/*

.DS_Store

downloads~

/Runtime/Plugins/ffi-*/LICENSE.md
/Runtime/Plugins/ffi-*/LICENSE.md.meta
/Runtime/Plugins/ffi-*/livekit_ffi.h
/Runtime/Plugins/ffi-*/livekit_ffi.h.meta

3 changes: 2 additions & 1 deletion BuildScripts~/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ protoc \
$FFI_PROTOCOL/room.proto \
$FFI_PROTOCOL/stats.proto \
$FFI_PROTOCOL/track.proto \
$FFI_PROTOCOL/video_frame.proto
$FFI_PROTOCOL/video_frame.proto \
$FFI_PROTOCOL/rpc.proto
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,71 @@ void TrackSubscribed(IRemoteTrack track, RemoteTrackPublication publication, Rem
}
```

### RPC

Perform your own predefined method calls from one participant to another.

This feature is especially powerful when used with [Agents](https://docs.livekit.io/agents), for instance to forward LLM function calls to your client application.

The following is a brief overview but [more detail is available in the documentation](https://docs.livekit.io/home/client/data/rpc).

#### Registering an RPC method

The participant who implements the method and will receive its calls must first register support. Your method handler will be an async callback that receives an `RpcInvocationData` object:

```cs
// Define your method handler
async Task<string> HandleGreeting(RpcInvocationData data)
{
Debug.Log($"Received greeting from {data.CallerIdentity}: {data.Payload}");
return $"Hello, {data.CallerIdentity}!";
}

// Register the method after connection to the room
room.LocalParticipant.RegisterRpcMethod("greet", HandleGreeting);
```

In addition to the payload, `RpcInvocationData` also contains `responseTimeout`, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side.

#### Performing an RPC request

The caller may initiate an RPC call using coroutines:

```cs
IEnumerator PerformRpcCoroutine()
{
var rpcCall = room.LocalParticipant.PerformRpc(new PerformRpcParams
{
DestinationIdentity = "recipient-identity",
Method = "greet",
Payload = "Hello from RPC!"
});

yield return rpcCall;

if (rpcCall.IsError)
{
Debug.Log($"RPC call failed: {rpcCall.Error}");
}
else
{
Debug.Log($"RPC response: {rpcCall.Payload}");
}
}

// Start the coroutine from another MonoBehaviour method
StartCoroutine(PerformRpcCoroutine());
```

You may find it useful to adjust the `ResponseTimeout` parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.

#### Errors

LiveKit is a dynamic realtime environment and RPC calls can fail for various reasons.

You may throw errors of the type `RpcError` with a string `message` in an RPC method handler and they will be received on the caller's side with the message intact. Other errors will not be transmitted and will instead arrive to the caller as `1500` ("Application Error"). Other built-in errors are detailed in the [docs](https://docs.livekit.io/home/client/data/rpc/#errors).



<!--BEGIN_REPO_NAV-->
<!--END_REPO_NAV-->
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-ios-arm64/liblivekit_ffi.a
Git LFS file not shown
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-ios-sim-arm64/liblivekit_ffi.a
Git LFS file not shown
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-linux-arm64/liblivekit_ffi.so
Git LFS file not shown
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-linux-x86_64/liblivekit_ffi.so
Git LFS file not shown
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-macos-arm64/liblivekit_ffi.dylib
Git LFS file not shown
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-macos-x86_64/liblivekit_ffi.dylib
Git LFS file not shown
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-windows-arm64/livekit_ffi.dll
Git LFS file not shown
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-windows-x86_64/livekit_ffi.dll
Git LFS file not shown
1 change: 0 additions & 1 deletion Runtime/Scripts/AudioSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public RtcAudioSource(AudioSource source, RtcAudioSourceType audioSourceType = R
newAudioSource.Options.EchoCancellation = true;
newAudioSource.Options.AutoGainControl = true;
newAudioSource.Options.NoiseSuppression = true;
newAudioSource.EnableQueue = false;
using var response = request.Send();
FfiResponse res = response;
_info = res.NewAudioSource.Source.Info;
Expand Down
28 changes: 19 additions & 9 deletions Runtime/Scripts/Internal/FFIClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

namespace LiveKit.Internal
{
#if UNITY_EDITOR
#if UNITY_EDITOR
[InitializeOnLoad]
#endif
#endif
internal sealed class FfiClient : IFFIClient
{
private static bool initialized = false;
Expand All @@ -37,10 +37,14 @@ internal sealed class FfiClient : IFFIClient
public event DisconnectReceivedDelegate? DisconnectReceived;
public event RoomEventReceivedDelegate? RoomEventReceived;
public event TrackEventReceivedDelegate? TrackEventReceived;
public event RpcMethodInvocationReceivedDelegate? RpcMethodInvocationReceived;

// participant events are not allowed in the fii protocol public event ParticipantEventReceivedDelegate ParticipantEventReceived;
public event VideoStreamEventReceivedDelegate? VideoStreamEventReceived;
public event AudioStreamEventReceivedDelegate? AudioStreamEventReceived;

public event PerformRpcReceivedDelegate? PerformRpcReceived;

public FfiClient() : this(Pools.NewFfiResponsePool(), new ArrayMemoryPool())
{
}
Expand All @@ -65,7 +69,7 @@ IMemoryPool memoryPool
this.ffiResponsePool = ffiResponsePool;
}

#if UNITY_EDITOR
#if UNITY_EDITOR
static FfiClient()
{
AssemblyReloadEvents.beforeAssemblyReload += OnBeforeAssemblyReload;
Expand Down Expand Up @@ -94,12 +98,12 @@ static void Init()

private static void Quit()
{
#if UNITY_EDITOR
#if UNITY_EDITOR
AssemblyReloadEvents.beforeAssemblyReload -= OnBeforeAssemblyReload;
AssemblyReloadEvents.afterAssemblyReload -= OnAfterAssemblyReload;
#endif
Instance.Dispose();
#endif
Instance.Dispose();

}

[RuntimeInitializeOnLoadMethod]
Expand All @@ -122,7 +126,7 @@ private static void InitializeSdk()
const bool captureLogs = false;
#endif

NativeMethods.LiveKitInitialize(FFICallback, captureLogs);
NativeMethods.LiveKitInitialize(FFICallback, captureLogs, "unity", ""); // TODO: Get SDK version

Utils.Debug("FFIServer - Initialized");
initialized = true;
Expand Down Expand Up @@ -198,7 +202,7 @@ out UIntPtr dataLen

[AOT.MonoPInvokeCallback(typeof(FFICallbackDelegate))]
static unsafe void FFICallback(UIntPtr data, UIntPtr size)
{
{
#if NO_LIVEKIT_MODE
return;
#endif
Expand Down Expand Up @@ -238,6 +242,9 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size)
case FfiEvent.MessageOneofCase.TrackEvent:
Instance.TrackEventReceived?.Invoke(r.TrackEvent!);
break;
case FfiEvent.MessageOneofCase.RpcMethodInvocation:
Instance.RpcMethodInvocationReceived?.Invoke(r.RpcMethodInvocation);
break;
case FfiEvent.MessageOneofCase.Disconnect:
Instance.DisconnectReceived?.Invoke(r.Disconnect!);
break;
Expand All @@ -251,6 +258,9 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size)
break;
case FfiEvent.MessageOneofCase.CaptureAudioFrame:
break;
case FfiEvent.MessageOneofCase.PerformRpc:
Instance.PerformRpcReceived?.Invoke(r.PerformRpc!);
break;
case FfiEvent.MessageOneofCase.GetStats:
case FfiEvent.MessageOneofCase.Panic:
break;
Expand Down
4 changes: 4 additions & 0 deletions Runtime/Scripts/Internal/FFIClients/FFIEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ namespace LiveKit.Internal

internal delegate void TrackEventReceivedDelegate(TrackEvent e);

internal delegate void RpcMethodInvocationReceivedDelegate(RpcMethodInvocationEvent e);


internal delegate void VideoStreamEventReceivedDelegate(VideoStreamEvent e);


internal delegate void AudioStreamEventReceivedDelegate(AudioStreamEvent e);

internal delegate void PerformRpcReceivedDelegate(PerformRpcCallback e);

}
27 changes: 27 additions & 0 deletions Runtime/Scripts/Internal/FFIClients/FfiRequestExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ public static void Inject<T>(this FfiRequest ffiRequest, T request)
case E2eeRequest e2EeRequest:
ffiRequest.E2Ee = e2EeRequest;
break;
// Rpc
case RegisterRpcMethodRequest registerRpcMethodRequest:
ffiRequest.RegisterRpcMethod = registerRpcMethodRequest;
break;
case UnregisterRpcMethodRequest unregisterRpcMethodRequest:
ffiRequest.UnregisterRpcMethod = unregisterRpcMethodRequest;
break;
case PerformRpcRequest performRpcRequest:
ffiRequest.PerformRpc = performRpcRequest;
break;
case RpcMethodInvocationResponseRequest rpcMethodInvocationResponseRequest:
ffiRequest.RpcMethodInvocationResponse = rpcMethodInvocationResponseRequest;
break;
default:
throw new Exception($"Unknown request type: {request?.GetType().FullName ?? "null"}");
}
Expand Down Expand Up @@ -132,6 +145,13 @@ public static void EnsureClean(this FfiRequest request)
|| request.NewAudioResampler != null
|| request.RemixAndResample != null
|| request.E2Ee != null
||

// Rpc
request.RegisterRpcMethod != null
|| request.UnregisterRpcMethod != null
|| request.PerformRpc != null
|| request.RpcMethodInvocationResponse != null
)
{
throw new InvalidOperationException("Request is not cleared");
Expand Down Expand Up @@ -181,6 +201,13 @@ public static void EnsureClean(this FfiResponse response)
|| response.NewAudioResampler != null
|| response.RemixAndResample != null
|| response.E2Ee != null
||

// Rpc
response.RegisterRpcMethod != null
|| response.UnregisterRpcMethod != null
|| response.PerformRpc != null
|| response.RpcMethodInvocationResponse != null
)
{
throw new InvalidOperationException("Response is not cleared: ");
Expand Down
2 changes: 1 addition & 1 deletion Runtime/Scripts/Internal/NativeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ internal static class NativeMethods
internal extern static unsafe FfiHandleId FfiNewRequest(byte* data, int len, out byte* dataPtr, out UIntPtr dataLen);

[DllImport(Lib, CallingConvention = CallingConvention.Cdecl, EntryPoint = "livekit_ffi_initialize")]
internal extern static FfiHandleId LiveKitInitialize(FFICallbackDelegate cb, bool captureLogs);
internal extern static FfiHandleId LiveKitInitialize(FFICallbackDelegate cb, bool captureLogs, string sdk, string sdkVersion);
}
}
Loading

0 comments on commit d90ee89

Please sign in to comment.