From 97c8b01d82c87c0682fecb28ea728c7d9d350ca1 Mon Sep 17 00:00:00 2001 From: joao caixinha Date: Wed, 4 Jan 2017 15:25:57 +0000 Subject: [PATCH] Added guaranteed message delivery methods --- Pod/Classes/OrtcClient/OrtcClient.h | 170 ++++++---- Pod/Classes/OrtcClient/OrtcClient.m | 505 ++++++++++++++++++++-------- RealtimeMessaging-iOS.podspec | 2 +- 3 files changed, 481 insertions(+), 196 deletions(-) diff --git a/Pod/Classes/OrtcClient/OrtcClient.h b/Pod/Classes/OrtcClient/OrtcClient.h index 4899373..63502d4 100644 --- a/Pod/Classes/OrtcClient/OrtcClient.h +++ b/Pod/Classes/OrtcClient/OrtcClient.h @@ -96,18 +96,18 @@ { @private - OrtcClient* ortcClient; - void (^onMessage)(OrtcClient* ortc, NSString* channel, NSString* message); - // ... + OrtcClient* ortcClient; + void (^onMessage)(OrtcClient* ortc, NSString* channel, NSString* message); + // ... } // ... @end - -- ViewController.m + + - ViewController.m

  #import "ViewController.h"
@@ -115,91 +115,91 @@
  
  - (void)viewDidLoad
  {
-    [super viewDidLoad];
-    
-    // Instantiate OrtcClient
-    ortcClient = [OrtcClient ortcClientWithConfig:self];
-    
-    // Post permissions
-    @try {
-        NSMutableDictionary* myPermissions = [[NSMutableDictionary alloc] init];
-        
-        [myPermissions setObject:@"w" forKey:@"channel1"];
-        [myPermissions setObject:@"w" forKey:@"channel2"];
-        [myPermissions setObject:@"r" forKey:@"channelread"];
-        
-        BOOL postResult = [ortcClient saveAuthentication:@"http://ortc_server"
-                            isCLuster:YES authenticationToken:@"myAuthenticationToken"
-                            authenticationTokenIsPrivate:NO applicationKey:@"myApplicationKey"
-                            timeToLive:1800 privateKey:@"myPrivateKey" permissions:myPermissions];
-        
-        if (postResult) {
-            // Permissions correctly posted
-        }
-        else {
-            // Unable to post permissions
-        }
-    }
-    @catch (NSException* exception) {
-        // Exception posting permissions
-    }
-    
-    // Set connection properties
-    [ortcClient setConnectionMetadata:@"clientConnMeta"];
-    [ortcClient setClusterUrl:@"http://ortc_server"];
-    
-    // Connect
-    [ortcClient connect:@"myApplicationKey" authenticationToken:@"myAuthenticationToken"];
+ [super viewDidLoad];
+ 
+ // Instantiate OrtcClient
+ ortcClient = [OrtcClient ortcClientWithConfig:self];
+ 
+ // Post permissions
+ @try {
+ NSMutableDictionary* myPermissions = [[NSMutableDictionary alloc] init];
+ 
+ [myPermissions setObject:@"w" forKey:@"channel1"];
+ [myPermissions setObject:@"w" forKey:@"channel2"];
+ [myPermissions setObject:@"r" forKey:@"channelread"];
+ 
+ BOOL postResult = [ortcClient saveAuthentication:@"http://ortc_server"
+ isCLuster:YES authenticationToken:@"myAuthenticationToken"
+ authenticationTokenIsPrivate:NO applicationKey:@"myApplicationKey"
+ timeToLive:1800 privateKey:@"myPrivateKey" permissions:myPermissions];
+ 
+ if (postResult) {
+ // Permissions correctly posted
+ }
+ else {
+ // Unable to post permissions
+ }
+ }
+ @catch (NSException* exception) {
+ // Exception posting permissions
+ }
+ 
+ // Set connection properties
+ [ortcClient setConnectionMetadata:@"clientConnMeta"];
+ [ortcClient setClusterUrl:@"http://ortc_server"];
+ 
+ // Connect
+ [ortcClient connect:@"myApplicationKey" authenticationToken:@"myAuthenticationToken"];
  }
  
  - (void) onConnected:(OrtcClient*) ortc
  {
-    // Connected
-    onMessage = ^(OrtcClient* ortc, NSString* channel, NSString* message) {
-    // Received message 'message' at channel 'channel'
-        [ortcClient unsubscribe:channel];
-    };
-    
-    [ortcClient subscribe:@"channel1" subscribeOnReconnected:YES onMessage:onMessage];
-    [ortcClient subscribe:@"channel2" subscribeOnReconnected:NO onMessage:onMessage];
-    [ortcClient subscribeWithNotifications:@"channel3" subscribeOnReconnected:YES onMessage:onMessage];
-}
-
+ // Connected
+ onMessage = ^(OrtcClient* ortc, NSString* channel, NSString* message) {
+ // Received message 'message' at channel 'channel'
+ [ortcClient unsubscribe:channel];
+ };
+ 
+ [ortcClient subscribe:@"channel1" subscribeOnReconnected:YES onMessage:onMessage];
+ [ortcClient subscribe:@"channel2" subscribeOnReconnected:NO onMessage:onMessage];
+ [ortcClient subscribeWithNotifications:@"channel3" subscribeOnReconnected:YES onMessage:onMessage];
+ }
+ 
  - (void) onDisconnected:(OrtcClient*) ortc
  {
-    // Disconnected
+ // Disconnected
  }
-
+ 
  - (void) onReconnecting:(OrtcClient*) ortc
  {
-    // Trying to reconnect
+ // Trying to reconnect
  }
  
  - (void) onReconnected:(OrtcClient*) ortc
  {
-    // Reconnected
+ // Reconnected
  }
  
  - (void) onSubscribed:(OrtcClient*) ortc channel:(NSString*) channel
  {
-    // Subscribed to the channel 'channel'
-    [ortcClient send:channel message:@"Message to the channel"];
+ // Subscribed to the channel 'channel'
+ [ortcClient send:channel message:@"Message to the channel"];
  }
-
+ 
  - (void) onUnsubscribed:(OrtcClient*) ortc channel:(NSString*) channel
  {
-    // Unsubscribed from the channel 'channel'
-    [ortcClient disconnect];
+ // Unsubscribed from the channel 'channel'
+ [ortcClient disconnect];
  }
-
+ 
  - (void) onException:(OrtcClient*) ortc error:(NSError*) error
  {
-    // Exception occurred
+ // Exception occurred
  }
-
+ 
  @end
  
-*/ + */ @@ -217,6 +217,8 @@ @property (assign) int connectionTimeout; @property (assign) BOOL isConnected; +@property (nonatomic, retain) NSMutableDictionary* pendingPublishMessages; +@property (assign) int publishTimeout; ///--------------------------------------------------------------------------------------- /// @name Class Methods @@ -249,6 +251,17 @@ * @param message The message to send. */ - (void)send:(NSString*) channel message:(NSString*) message; + +/** + * Publish a message to a channel. + * + * @param channel The channel name. + * @param message The message to publish. + * @param ttl The message expiration time in seconds (0 for maximum allowed ttl). + * @param callback Returns error if message publish was not successful or published message unique id (seqId) if sucessfully published + */ +- (void)publish:(NSString*)channel message:(NSString*)aMessage ttl:(NSNumber*)ttl callback:(void(^)(NSError* error, NSString* seqId))callback; + /** * Subscribes to a channel to receive messages sent to it. * @@ -258,6 +271,35 @@ */ - (void)subscribe:(NSString*) channel subscribeOnReconnected:(BOOL) aSubscribeOnReconnected onMessage:(void (^)(OrtcClient* ortc, NSString* channel, NSString* message)) onMessage; +/** + * Subscribes to a channel to receive messages sent to it with given options. + * + * @param options The subscription options dictionary, EX: "options = { + * channel, + * subscribeOnReconnected, // optional, default = true + * regId, // optional, default = "", device token for push notifications as in subscribeWithNotifications + * pushPlatform, // optional, default = "", push notifications platform as in subscribeWithNotifications + * filter, // optional, default = "", the subscription filter as in subscribeWithFilter + * subscriberId // optional, default = "", the subscriberId as in subscribeWithBuffer + * }". + * @param onMessageWithOptionsCallback The callback called when a message arrives at the channel, data is provided in a dictionary EX: "msgOptions = { + * channel, + * seqId, // the message unique identifier + * filtered, // true if message was properly filtered using the subscription filter + * message, the message received + * }". + */ +- (void)subscribeWithOptions:(NSDictionary*)options onMessageWithOptionsCallback:(void (^)(OrtcClient* ortc, NSDictionary* msgOptions)) onMessageWithOptionsCallback; + +/** + * Subscribes to a channel to receive messages published to it. + * + * @param channel The channel name. + * @param subscriberId The subscriberId associated to the channel. + * @param onMessageWithBufferCallback The callback called when a message arrives at the channel. + */ +- (void)subscribeWithBuffer:(NSString*)channel subscriberId:(NSString*)subscriberId +onMessageWithBufferCallback:(void (^)(OrtcClient* ortc, NSString* channel, NSString* seqId, NSString* message))onMessageWithBufferCallback; /** * Subscribes to a channel, with a filter, to receive messages sent to it that validate the given filter. diff --git a/Pod/Classes/OrtcClient/OrtcClient.m b/Pod/Classes/OrtcClient/OrtcClient.m index 56e25cb..9fdbad8 100644 --- a/Pod/Classes/OrtcClient/OrtcClient.m +++ b/Pod/Classes/OrtcClient/OrtcClient.m @@ -86,9 +86,13 @@ @interface ChannelSubscription : NSObject @property (assign) BOOL subscribeOnReconnected; @property (assign) BOOL withNotifications; @property (assign) BOOL withFilter; +@property (assign) BOOL withOptions; @property (nonatomic, retain) NSString *filter; +@property (nonatomic, retain) NSString *subscriberId; +@property (nonatomic, retain) NSString *regId; @property (nonatomic, strong) void (^onMessage)(OrtcClient* ortc, NSString* channel, NSString* message); @property (nonatomic, strong) void (^onMessageWithFilter)(OrtcClient* ortc, NSString* channel, BOOL filtered, NSString* message); +@property (nonatomic, strong) void (^onMessageWithOptions)(OrtcClient* ortc, NSDictionary* msgOptions); @end @@ -126,7 +130,8 @@ @implementation OrtcClient opValidate, opSubscribe, opUnsubscribe, - opException + opException, + opAck } opCodes; typedef enum { @@ -147,6 +152,7 @@ @implementation OrtcClient NSString* const CHANNEL_PATTERN = @"^\\\\\"ch\\\\\":\\\\\"(.*?)\\\\\"$"; NSString* const EXCEPTION_PATTERN = @"^\\\\\"ex\\\\\":\\{(\\\\\"op\\\\\":\\\\\"(.*?[^\"]+)\\\\\",)?(\\\\\"ch\\\\\":\\\\\"(.*?)\\\\\",)?\\\\\"ex\\\\\":\\\\\"(.*?)\\\\\"\\}$"; NSString* const RECEIVED_PATTERN = @"^a\\[\"\\{\\\\\"ch\\\\\":\\\\\"(.*?)\\\\\",\\\\\"m\\\\\":\\\\\"([\\s\\S]*?)\\\\\"\\}\"\\]$"; +NSString* const JSON_PATTERN = @"^a\\[\"(.*?)\"\\]$"; NSString* const RECEIVED_PATTERN_FILTERED = @"^a\\[\"\\{\\\\\"ch\\\\\":\\\\\"(.*?)\\\\\",\\\\\"f\\\\\":(.*),\\\\\"m\\\\\":\\\\\"([\\s\\S]*?)\\\\\"\\}\"\\]$"; NSString* const MULTI_PART_MESSAGE_PATTERN = @"^(.[^_]*?)_(.[^-]*?)-(.[^_]*?)_([\\s\\S]*?)$"; NSString* const CLUSTER_RESPONSE_PATTERN = @"^var SOCKET_SERVER = \\\"(.*?)\\\";$"; @@ -231,6 +237,160 @@ - (void)connect:(NSString*) aApplicationKey authenticationToken:(NSString*) aAut } } + +- (void)publish:(NSString*)channel message:(NSString*)aMessage ttl:(NSNumber*)ttl callback:(void(^)(NSError* error, NSString* seqId))callback{ + /* + * Sanity Checks. + */ + if (!isConnected) { + [self delegateExceptionCallback:self error:[self generateError:@"Not connected"]]; + } + else if ([self isEmpty:channel]) { + [self delegateExceptionCallback:self error:[self generateError:@"Channel is null or empty"]]; + } + else if (![self ortcIsValidInput:channel]) { + [self delegateExceptionCallback:self error:[self generateError:@"Channel has invalid characters"]]; + } + else if ([self isEmpty:aMessage]) { + [self delegateExceptionCallback:self error:[self generateError:@"Message is null or empty"]]; + } + else { + + aMessage = [[aMessage stringByReplacingOccurrencesOfString:@"\\" withString:@"\\\\"] stringByReplacingOccurrencesOfString:@"\n" withString:@"\\n"]; + aMessage = [aMessage stringByReplacingOccurrencesOfString:@"\"" withString:@"\\\""]; + + NSData* channelBytes = [channel dataUsingEncoding:NSUTF8StringEncoding]; + + if (channelBytes.length >= MAX_CHANNEL_SIZE) { + [self delegateExceptionCallback:self error:[self generateError:[NSString stringWithFormat:@"Channel size exceeds the limit of %d characters", MAX_CHANNEL_SIZE]]]; + } + else { + unsigned long domainChannelIndex = (int)[channel rangeOfString:@":"].location; + NSString* channelToValidate = channel; + NSString* hashPerm = nil; + + + if (domainChannelIndex != NSNotFound) { + channelToValidate = [[channel substringToIndex:domainChannelIndex + 1] stringByAppendingString:@"*"]; + } + + if (_permissions) { + hashPerm = [_permissions objectForKey:channelToValidate] ? [_permissions objectForKey:channelToValidate] : [_permissions objectForKey:channel]; + } + + if (_permissions && !hashPerm) { + [self delegateExceptionCallback:self error:[self generateError:[NSString stringWithFormat:@"No permission found to send to the channel '%@'", channel]]]; + } + else { + + NSData* messageBytes = [NSData dataWithBytes:[aMessage UTF8String] length:[aMessage lengthOfBytesUsingEncoding:NSUTF8StringEncoding]]; + + NSMutableArray* messageParts = [[NSMutableArray alloc] init]; + unsigned long pos = 0; + unsigned long remaining; + NSString* messageId = [self generateId:8]; + + // Multi part + while ((remaining = messageBytes.length - pos) > 0) { + unsigned long arraySize = 0; + + if (remaining >= MAX_MESSAGE_SIZE - channelBytes.length) { + arraySize = MAX_MESSAGE_SIZE - ((int)channelBytes.length); + } + else { + arraySize = remaining; + } + + Byte messagePart[arraySize]; + + [messageBytes getBytes:messagePart range:NSMakeRange(pos, arraySize)]; + + [messageParts addObject:[[NSString alloc] initWithBytes:messagePart length:arraySize encoding:NSUTF8StringEncoding]]; + + pos += arraySize; + } + + __block NSString* err; + + if(_pendingPublishMessages == nil) { + _pendingPublishMessages = [[NSMutableDictionary alloc] init]; + } + + if([_pendingPublishMessages objectForKey:messageId]){ + err = @"Message id conflict. Please retry publishing the message"; + } + else { + + if(!ttl) { + ttl = 0; + } + + // check for acknowledge timeout + NSTimer* ackTimeout = [NSTimer scheduledTimerWithTimeInterval:_publishTimeout repeats:NO block:^(NSTimer * _Nonnull timer) { + if([_pendingPublishMessages objectForKey:messageId]) { + err = [NSString stringWithFormat:@"Message publish timeout after %d seconds", _publishTimeout]; + if([[_pendingPublishMessages objectForKey:messageId] objectForKey:@"callback"]) { + void (^callbackP)(NSError*, NSString*) = (void(^)(NSError*, NSString*)) [[_pendingPublishMessages objectForKey:messageId] objectForKey:@"callback"]; + callbackP([self generateError:err], nil); + [_pendingPublishMessages removeObjectForKey:messageId]; + } + [_pendingPublishMessages removeObjectForKey:messageId]; + } + }]; + + NSDictionary* pendingMsg = @{ + @"totalNumOfParts": @(messageParts.count), + @"callback": callback, + @"timeout": ackTimeout + }; + + [_pendingPublishMessages setObject:pendingMsg forKey:messageId]; + + } + + if (messageParts.count < 20) { + int counter = 1; + + for (NSString* __strong messageToSend in messageParts) { + NSString* encodedData = [[NSString alloc] initWithData:[NSData dataWithBytes:[messageToSend UTF8String] length:[messageToSend lengthOfBytesUsingEncoding:NSUTF8StringEncoding]] encoding:NSUTF8StringEncoding]; + + NSString* aString = [NSString stringWithFormat:@"\"publish;%@;%@;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, ttl, hashPerm, [[[[[[messageId stringByAppendingString:@"_"] stringByAppendingString:[NSString stringWithFormat:@"%d", counter]] stringByAppendingString:@"-"] stringByAppendingString:[NSString stringWithFormat:@"%d", ((int)messageParts.count)]] stringByAppendingString:@"_"] stringByAppendingString:encodedData]]; + + [_webSocket send:aString]; + + counter++; + } + }else{ + int counter = 1; + __block int partsSent = 0; + partSendInterval = [NSTimer scheduledTimerWithTimeInterval:100 repeats:YES block:^(NSTimer * _Nonnull timer) { + + if(isConnected && _webSocket) { + int currentPart = partsSent + 1; + int totalParts = messageParts.count; + + NSString* encodedData = [[NSString alloc] initWithData:[NSData dataWithBytes:[[messageParts objectAtIndex:currentPart] UTF8String] length:[[messageParts objectAtIndex:currentPart] lengthOfBytesUsingEncoding:NSUTF8StringEncoding]] encoding:NSUTF8StringEncoding]; + NSString* aString = [NSString stringWithFormat:@"\"publish;%@;%@;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, ttl, hashPerm, [[[[[[messageId stringByAppendingString:@"_"] stringByAppendingString:[NSString stringWithFormat:@"%d", counter]] stringByAppendingString:@"-"] stringByAppendingString:[NSString stringWithFormat:@"%d", ((int)messageParts.count)]] stringByAppendingString:@"_"] stringByAppendingString:encodedData]]; + + [_webSocket send:aString]; + partsSent++; + + if(partsSent == messageParts.count) { + [partSendInterval invalidate]; + } + } else { + // socket was disconnected, stop sending + [partSendInterval invalidate]; + } + }]; + } + } + } + } +} + +NSTimer* partSendInterval; + - (void)send:(NSString*) channel message:(NSString*) aMessage { /* @@ -337,6 +497,113 @@ - (void)subscribeWithNotifications:(NSString*) channel subscribeOnReconnected:(B [self subscribeChannel:channel WithNotifications:WITH_NOTIFICATIONS withFilter:NO filter:@"" subscribeOnReconnected:aSubscribeOnReconnected onMessage:onMessage onMessageWithFilter: nil]; } +- (void)subscribeWithOptions:(NSDictionary*)options onMessageWithOptionsCallback:(void (^)(OrtcClient* ortc, NSDictionary* msgOptions)) onMessageWithOptionsCallback{ + if(options) { + [self _subscribeOptions:[options objectForKey:@"channel"] subscribeOnReconnected:[options objectForKey:@"subscribeOnReconnected"] regId:[options objectForKey:@"regId"] filter:[options objectForKey:@"filter"] subscriberId:[options objectForKey:@"subscriberId"] onMessageWithOptionsCallback:onMessageWithOptionsCallback]; + } else { + [self delegateExceptionCallback:self error:[self generateError:@"subscribeWithOptions called with no options"]]; + } +} + + +- (void) subscribeWithBuffer:(NSString*)channel subscriberId:(NSString*)subscriberId + onMessageWithBufferCallback:(void (^)(OrtcClient* ortc, NSString* channel, NSString* seqId, NSString* message))onMessageWithBufferCallback{ + if(subscriberId) { + NSDictionary* options = @{ + @"channel": channel, + @"subscribeOnReconnected": @YES, + @"subscriberId": subscriberId + }; + + [self subscribeWithOptions:options onMessageWithOptionsCallback:^(OrtcClient *ortc, NSDictionary *msgOptions) { + onMessageWithBufferCallback(ortc, [msgOptions objectForKey:@"channel"], [msgOptions objectForKey:@"seqId"], [msgOptions objectForKey:@"message"]); + }]; + + } else { + [self delegateExceptionCallback:self error:[self generateError:@"subscribeWithBuffer called with no subscriberId"]]; + } +} + + +- (void)_subscribeOptions:(NSString*)channel subscribeOnReconnected:(BOOL)aSubscribeOnReconnected regId:(NSString*)regId filter:(NSString*)aFilter subscriberId:(NSString*)subscriberId onMessageWithOptionsCallback:(void (^)(OrtcClient* ortc, NSDictionary* msgOptions)) onMessageWithOptionsCallback { + /* + Sanity Checks + */ + if (!isConnected) { + [self delegateExceptionCallback:self error:[self generateError:@"Not connected"]]; + } + else if (!channel) { + [self delegateExceptionCallback:self error:[self generateError:@"Channel is null or empty"]]; + } + else if (![self ortcIsValidInput:channel]) { + [self delegateExceptionCallback:self error:[self generateError:@"Channel has invalid characters"]]; + } + else if (![self ortcIsValidInput:subscriberId]) { + [self delegateExceptionCallback:self error:[self generateError:@"subscriberId has invalid characters"]]; + } + else if ([_subscribedChannels objectForKey:channel] && [[_subscribedChannels objectForKey:channel] isSubscribing]) { + [self delegateExceptionCallback:self error:[self generateError:@"Already subscribing to the channel \'' + channel + '\''"]]; + } + else if ([_subscribedChannels objectForKey:channel] && [[_subscribedChannels objectForKey:channel] isSubscribed]) { + [self delegateExceptionCallback:self error:[self generateError:@"Already subscribed to the channel \'' + channel + '\'"]]; + } + else if (channel.length > MAX_CHANNEL_SIZE) { + [self delegateExceptionCallback:self error:[self generateError:@"Channel size exceeds the limit of ' + channelMaxSize + ' characters"]]; + } + else { + + if (!aSubscribeOnReconnected) { + aSubscribeOnReconnected = true; + } + + if(!regId) { + regId = @""; + } + + if(!aFilter) { + aFilter = @""; + } + + if(!subscriberId) { + subscriberId = @""; + } + + NSArray* domainChannelCharacterIndex = [channel componentsSeparatedByString:@":"]; + NSString* channelToValidate = channel; + + if (domainChannelCharacterIndex.count > 0) { + channelToValidate = [NSString stringWithFormat:@"%@*", [domainChannelCharacterIndex objectAtIndex:0]]; + } + + NSString* hashPerm = [self checkChannelPermissions:channel]; + if (!_permissions || (_permissions && hashPerm != nil)) { + if (![_subscribedChannels objectForKey:channel]) { + // Instantiate ChannelSubscription + ChannelSubscription* channelSubscription = [[ChannelSubscription alloc] init]; + + // Set channelSubscription properties + channelSubscription.isSubscribing = YES; + channelSubscription.isSubscribed = NO; + channelSubscription.subscribeOnReconnected = aSubscribeOnReconnected; + channelSubscription.withOptions = YES; + channelSubscription.onMessageWithOptions = [onMessageWithOptionsCallback copy]; + channelSubscription.subscriberId = subscriberId; + channelSubscription.regId = regId; + channelSubscription.withNotifications = ([regId isEqualToString:@""] ? NO : YES); + // Add to subscribed channels dictionary + [_subscribedChannels setObject:channelSubscription forKey:channel]; + } + + NSString* aString = nil; + aString = [NSString stringWithFormat:@"\"subscribeoptions;%@;%@;%@;%@;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, subscriberId, regId, PLATFORM, hashPerm, aFilter]; + + if (![self isEmpty:aString]) { + [_webSocket send:aString]; + } + } + } +} + - (void)subscribeChannel:(NSString*) channel WithNotifications:(BOOL) withNotifications withFilter:(BOOL) withFilter filter:(NSString*)aFilter subscribeOnReconnected:(BOOL) aSubscribeOnReconnected onMessage:(void (^)(OrtcClient* ortc, NSString* channel, NSString* message)) onMessage onMessageWithFilter:(void (^)(OrtcClient* ortc, NSString* channel, BOOL filtered, NSString* message)) onMessageWithFilter @@ -355,6 +622,7 @@ - (void)subscribeChannel:(NSString*) channel WithNotifications:(BOOL) withNotifi channelSubscription.subscribeOnReconnected = aSubscribeOnReconnected; channelSubscription.withFilter = withFilter; channelSubscription.filter = aFilter; + channelSubscription.subscriberId = @""; if (withFilter) { channelSubscription.onMessageWithFilter = [onMessageWithFilter copy]; @@ -367,6 +635,8 @@ - (void)subscribeChannel:(NSString*) channel WithNotifications:(BOOL) withNotifi [_subscribedChannels setObject:channelSubscription forKey:channel]; } + ChannelSubscription* channelSubscription = [_subscribedChannels objectForKey:channel]; + NSString* aString = nil; if (withNotifications) { if (![self isEmpty:[OrtcClient getDEVICE_TOKEN]]) { @@ -378,6 +648,9 @@ - (void)subscribeChannel:(NSString*) channel WithNotifications:(BOOL) withNotifi } } else if(withFilter){ aString = [NSString stringWithFormat:@"\"subscribefilter;%@;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, hashPerm, aFilter]; + } else if (channelSubscription.withOptions){ + NSString* aString = nil; + aString = [NSString stringWithFormat:@"\"subscribeoptions;%@;%@;%@;%@;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, channelSubscription.subscriberId, channelSubscription.regId, PLATFORM, hashPerm, aFilter]; } else { aString = [NSString stringWithFormat:@"\"subscribe;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, hashPerm]; @@ -934,6 +1207,11 @@ - (void)parseReceivedMessage:(NSString*) aMessage [self opException:arguments]; } break; + case opAck: + if (arguments) { + [self opAck:aMessage]; + } + break; default: [self delegateExceptionCallback:self error:[self generateError:[NSString stringWithFormat:@"Unknown message received: %@", aMessage]]]; break; @@ -1040,6 +1318,8 @@ - (void)opValidated:(NSString*) message { } }else if(channelSubscription.withFilter){ aString = [NSString stringWithFormat:@"\"subscribefilter;%@;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, hashPerm, channelSubscription.filter]; + } else if (channelSubscription.withOptions){ + aString = [NSString stringWithFormat:@"\"subscribeoptions;%@;%@;%@;%@;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, channelSubscription.subscriberId, channelSubscription.regId, PLATFORM, hashPerm, channelSubscription.filter?channelSubscription.filter:@""]; } else { aString = [NSString stringWithFormat:@"\"subscribe;%@;%@;%@;%@\"", applicationKey, authenticationToken, channel, hashPerm]; @@ -1088,7 +1368,7 @@ - (void)opSubscribed:(NSString*) message { channel = [message substringWithRange:strRangeChn]; } - if (channel) { + if (channel && ((ChannelSubscription*) [_subscribedChannels objectForKey:channel]).isSubscribed == NO) { ChannelSubscription* channelSubscription = [_subscribedChannels objectForKey:channel]; channelSubscription.isSubscribing = NO; @@ -1120,6 +1400,40 @@ - (void)opUnsubscribed:(NSString*) message { } } +- (void)opAck:(NSString*) message{ + NSRegularExpression* recJSONRegex = [NSRegularExpression regularExpressionWithPattern:JSON_PATTERN options:0 error:NULL]; + NSTextCheckingResult* recJSONMatch = [recJSONRegex firstMatchInString:message options:0 range:NSMakeRange(0, [message length])]; + + NSString* aJSON = nil; + + NSRange strRangeJSON = [recJSONMatch rangeAtIndex:1]; + + if (strRangeJSON.location != NSNotFound) { + message = [message substringWithRange:strRangeJSON]; + message = [self simulateJsonParse:message]; + } + + NSError *jsonError; + NSData *objectData = [message dataUsingEncoding:NSUTF8StringEncoding]; + NSDictionary *json = [NSJSONSerialization JSONObjectWithData:objectData + options:NSJSONReadingMutableContainers + error:&jsonError]; + + if ([json objectForKey:@"m"] && [json objectForKey:@"seq"]) { + NSDictionary* pendingMsg = [_pendingPublishMessages objectForKey:[json objectForKey:@"m"]]; + + NSTimer* timeout = [pendingMsg objectForKey:@"timeout"]; + [timeout invalidate]; + + void (^callback)(NSError*, NSString*) = [pendingMsg objectForKey:@"callback"]; + if ([json objectForKey:@"seq"]) { + callback(nil, [json objectForKey:@"seq"]); + } + + [_pendingPublishMessages removeObjectForKey:[json objectForKey:@"m"]]; + } +} + - (void)opException:(NSString*) message { NSRegularExpression* exRegex = [NSRegularExpression regularExpressionWithPattern:EXCEPTION_PATTERN options:0 error:NULL]; NSTextCheckingResult* exMatch = [exRegex firstMatchInString:message options:0 range:NSMakeRange(0, [message length])]; @@ -1197,26 +1511,46 @@ - (void)opException:(NSString*) message { } - (void)opReceive:(NSString*) message { - NSRegularExpression* recRegex = [NSRegularExpression regularExpressionWithPattern:RECEIVED_PATTERN options:0 error:NULL]; - NSTextCheckingResult* recMatch = [recRegex firstMatchInString:message options:0 range:NSMakeRange(0, [message length])]; - NSRegularExpression* recRegexFiltered = [NSRegularExpression regularExpressionWithPattern:RECEIVED_PATTERN_FILTERED options:0 error:NULL]; - NSTextCheckingResult* recMatchFiltered = [recRegexFiltered firstMatchInString:message options:0 range:NSMakeRange(0, [message length])]; + NSRegularExpression* recJSONRegex = [NSRegularExpression regularExpressionWithPattern:JSON_PATTERN options:0 error:NULL]; + NSTextCheckingResult* recJSONMatch = [recJSONRegex firstMatchInString:message options:0 range:NSMakeRange(0, [message length])]; + + NSString* aJSON = nil; - if (recMatch) + NSRange strRangeJSON = [recJSONMatch rangeAtIndex:1]; + + if (strRangeJSON.location != NSNotFound) { + message = [message substringWithRange:strRangeJSON]; + message = [self simulateJsonParse:message]; + } + + NSError *jsonError; + NSData *objectData = [message dataUsingEncoding:NSUTF8StringEncoding]; + NSDictionary *json = [NSJSONSerialization JSONObjectWithData:objectData + options:NSJSONReadingMutableContainers + error:&jsonError]; + + if (json) { NSString* aChannel = nil; NSString* aMessage = nil; + NSString* aFiltered = nil; + NSString* aSeqId = nil; - NSRange strRangeChn = [recMatch rangeAtIndex:1]; - NSRange strRangeMsg = [recMatch rangeAtIndex:2]; + if ([json objectForKey:@"ch"]) { + aChannel = [json objectForKey:@"ch"]; + } - if (strRangeChn.location != NSNotFound) { - aChannel = [message substringWithRange:strRangeChn]; + if ([json objectForKey:@"m"]) { + aMessage = [json objectForKey:@"m"]; + } + + if ([json objectForKey:@"f"]) { + aFiltered = [json objectForKey:@"f"]; } - if (strRangeMsg.location != NSNotFound) { - aMessage = [message substringWithRange:strRangeMsg]; + if ([json objectForKey:@"s"]) { + aSeqId = [json objectForKey:@"s"]; } if (aChannel && aMessage) { @@ -1304,124 +1638,33 @@ - (void)opReceive:(NSString*) message { [messagesBuffer setObject:msgSentDict forKey:messageId]; } - aMessage = [self escapeRecvChars:aMessage]; + //aMessage = [self escapeRecvChars:aMessage]; aMessage = [self checkForEmoji:aMessage]; - channelSubscription.onMessage(self, aChannel, aMessage); - } - } - } - }else if (recMatchFiltered){ - NSString* aChannel = nil; - NSString* aMessage = nil; - NSString* aFiltered = nil; - - NSRange strRangeChn = [recMatchFiltered rangeAtIndex:1]; - NSRange strRangeFiltered = [recMatchFiltered rangeAtIndex:2]; - NSRange strRangeMsg = [recMatchFiltered rangeAtIndex:3]; - - if (strRangeChn.location != NSNotFound) { - aChannel = [message substringWithRange:strRangeChn]; - } - - if (strRangeFiltered.location != NSNotFound) { - aFiltered = [message substringWithRange:strRangeFiltered]; - } - - if (strRangeMsg.location != NSNotFound) { - aMessage = [message substringWithRange:strRangeMsg]; - } - - if (aChannel && aMessage && aFiltered) { - //aMessage = [[[aMessage stringByReplacingOccurrencesOfString:@"\\\\n" withString:@"\n"] stringByReplacingOccurrencesOfString:@"\\\\\"" withString:@"\""] stringByReplacingOccurrencesOfString:@"\\\\\\\\" withString:@"\\"]; - - // Multi part - NSRegularExpression* msgRegex = [NSRegularExpression regularExpressionWithPattern:MULTI_PART_MESSAGE_PATTERN options:0 error:NULL]; - NSTextCheckingResult* multiMatch = [msgRegex firstMatchInString:aMessage options:0 range:NSMakeRange(0, [aMessage length])]; - - NSString* messageId = nil; - int messageCurrentPart = 1; - int messageTotalPart = 1; - BOOL lastPart = NO; - - if (multiMatch) - { - NSRange strRangeMsgId = [multiMatch rangeAtIndex:1]; - NSRange strRangeMsgCurPart = [multiMatch rangeAtIndex:2]; - NSRange strRangeMsgTotPart = [multiMatch rangeAtIndex:3]; - NSRange strRangeMsgRec = [multiMatch rangeAtIndex:4]; - - if (strRangeMsgId.location != NSNotFound) { - messageId = [aMessage substringWithRange:strRangeMsgId]; - } - - if (strRangeMsgCurPart.location != NSNotFound) { - messageCurrentPart = [[aMessage substringWithRange:strRangeMsgCurPart] intValue]; - } - - if (strRangeMsgTotPart.location != NSNotFound) { - messageTotalPart = [[aMessage substringWithRange:strRangeMsgTotPart] intValue]; - } - - if (strRangeMsgRec.location != NSNotFound) { - aMessage = [aMessage substringWithRange:strRangeMsgRec]; - //code below written by Rafa, gives a bug for a meesage containing % character - //aMessage = [[aMessage substringWithRange:strRangeMsgRec] stringByReplacingPercentEscapesUsingEncoding:NSUTF8StringEncoding]; - } - } - // Is a message part - if (![self isEmpty:messageId]) { - if (![messagesBuffer objectForKey:messageId]) { - NSMutableDictionary *msgSentDict = [[NSMutableDictionary alloc] init]; - [msgSentDict setObject:[NSNumber numberWithBool:NO] forKey:@"isMsgSent"]; - [messagesBuffer setObject:msgSentDict forKey:messageId]; - } - - NSMutableDictionary* messageBufferId = [messagesBuffer objectForKey:messageId]; - [messageBufferId setObject:aMessage forKey:[NSString stringWithFormat:@"%d", messageCurrentPart]]; - - // Last message part -1 isMsgSent Key - if (([[messageBufferId allKeys] count] -1) == messageTotalPart) { - lastPart = YES; + if (channelSubscription.withFilter) { + channelSubscription.onMessageWithFilter(self, aChannel, [aFiltered boolValue], aMessage); + }else if (channelSubscription.withOptions){ + NSMutableDictionary *dataResult = [[NSMutableDictionary alloc] init]; + [dataResult setObject:aChannel forKey:@"channel"]; + [dataResult setObject:aMessage forKey:@"message"]; + if (aFiltered) { + [dataResult setObject:aFiltered forKey:@"filter"]; + } + if (aSeqId) { + [dataResult setObject:aSeqId forKey:@"seqId"]; + } + channelSubscription.onMessageWithOptions(self, dataResult); + }else if(channelSubscription.onMessage){ + channelSubscription.onMessage(self, aChannel, aMessage); + } } } - // Message does not have multipart, like the messages received at announcement channels - else { - lastPart = YES; - } - if (lastPart) { - if (![self isEmpty:messageId]) { - aMessage = @""; - NSMutableDictionary* messageBufferId = [messagesBuffer objectForKey:messageId]; - - for (int i = 1; i <= messageTotalPart; i++) { - NSString* messagePart = [messageBufferId objectForKey:[NSString stringWithFormat:@"%d", i]]; - - aMessage = [aMessage stringByAppendingString:messagePart]; - // Delete from messages buffer - [messageBufferId removeObjectForKey:[NSString stringWithFormat:@"%d", i]]; - } - } - - if ([messagesBuffer objectForKey:messageId] && [[[messagesBuffer objectForKey:messageId] objectForKey:@"isMsgSent"] boolValue]) { - [messagesBuffer removeObjectForKey:messageId]; - } - else if ([_subscribedChannels objectForKey:aChannel]) { - ChannelSubscription* channelSubscription = [_subscribedChannels objectForKey:aChannel]; - - if (![self isEmpty:messageId]) { - NSMutableDictionary *msgSentDict = [messagesBuffer objectForKey:messageId]; - [msgSentDict setObject:[NSNumber numberWithBool:YES] forKey:@"isMsgSent"]; - [messagesBuffer setObject:msgSentDict forKey:messageId]; - } - - aMessage = [self escapeRecvChars:aMessage]; - - aMessage = [self checkForEmoji:aMessage]; - channelSubscription.onMessageWithFilter(self, aChannel, [aFiltered boolValue], aMessage); - } + if(messageId && aSeqId) { + NSString* haveAllParts = lastPart ? @"1" : @"0"; + NSString* ack = [NSString stringWithFormat:@"\"ack;%@;%@;%@;%@;%@\"",applicationKey, aChannel, messageId, aSeqId, haveAllParts]; + [_webSocket send:ack]; } } } @@ -1502,8 +1745,6 @@ - (NSString*)simulateJsonParse:(NSString*)str{ } } - - return ms ; } @@ -1555,15 +1796,15 @@ - (void)processDisconnect:(BOOL) callDisconnectedCallback _webSocket.delegate = nil; [_webSocket close]; - if (callDisconnectedCallback) { - [self delegateDisconnectedCallback:self]; - } - isConnected = NO; isConnecting = NO; // Clear user permissions _permissions = nil; + + if (callDisconnectedCallback) { + [self delegateDisconnectedCallback:self]; + } } - (void)processConnect:(id) sender @@ -1826,6 +2067,7 @@ - (id)initWithConfig:(id) aDelegate [opCases setObject:[NSNumber numberWithInt:opSubscribe] forKey:@"ortc-subscribed"]; [opCases setObject:[NSNumber numberWithInt:opUnsubscribe] forKey:@"ortc-unsubscribed"]; [opCases setObject:[NSNumber numberWithInt:opException] forKey:@"ortc-error"]; + [opCases setObject:[NSNumber numberWithInt:opAck] forKey:@"ortc-ack"]; } if (errCases == nil) { @@ -1842,6 +2084,7 @@ - (id)initWithConfig:(id) aDelegate _ortcDelegate = aDelegate; connectionTimeout = 5; // seconds + _publishTimeout = 5; sessionExpirationTime = 30; // minutes isConnected = NO; diff --git a/RealtimeMessaging-iOS.podspec b/RealtimeMessaging-iOS.podspec index 8a00373..1890d81 100644 --- a/RealtimeMessaging-iOS.podspec +++ b/RealtimeMessaging-iOS.podspec @@ -9,7 +9,7 @@ Pod::Spec.new do |s| s.name = "RealtimeMessaging-iOS" - s.version = "2.1.36" + s.version = "2.1.38" s.summary = "Realtime Cloud Messaging (ORTC) SDK for iOS" s.description = <<-DESC Part of the The Realtime® Framework, Realtime Cloud Messaging (aka ORTC) is a secure, fast and highly scalable cloud-hosted Pub/Sub real-time message broker for web and mobile apps.