Skip to content

Commit

Permalink
Fix iOS push message bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
wubenqi committed Dec 22, 2018
1 parent 229a445 commit 265aa70
Show file tree
Hide file tree
Showing 23 changed files with 97,647 additions and 97,494 deletions.
62 changes: 12 additions & 50 deletions access/session/server/auth_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func makeClientConnID(connType int, clientConnID, frontendConnID uint64) ClientC
}

func (c ClientConnID) String() string {
return fmt.Sprintf("{conn_type: %d, client_conn_id: %d, frontend_conn_id: %d}", c.connType, c.clientConnID, c.frontendConnID)
return fmt.Sprintf("%d@%d.%d", c.connType, c.clientConnID, c.frontendConnID)
}

func (c ClientConnID) Equal(id ClientConnID) bool {
Expand Down Expand Up @@ -216,6 +216,12 @@ func (s *authSessions) setLayer(layer int32) {

func (s *authSessions) destroySession(sessionId int64) bool {
// TODO(@benqi):
if sess, ok := s.sessions[sessionId]; ok {
s.updates.onGenericSessionClose(sess)
delete(s.sessions, sessionId)
} else {
//
}
return true
}

Expand Down Expand Up @@ -448,7 +454,7 @@ func (s *authSessions) onSessionData(sessionMsg *sessionData) {
authKey := getCacheAuthKey(s.authKeyId)
if authKey == nil {
// err := fmt.Errorf("onSessionData - not found authKeyId")
glog.Errorf("onSessionData - error: {not found authKeyId}, data: {sess: %s, conn_id: %s, md: %s}", s, sessionMsg.connID, sessionMsg.cntl)
glog.Errorf("onSessionData - error: {not found authKeyId}, data: {sessions: %s, conn_id: %s, md: %s}", s, sessionMsg.connID, sessionMsg.cntl)
return
} else {
s.onBindAuthKey(authKey)
Expand All @@ -459,14 +465,12 @@ func (s *authSessions) onSessionData(sessionMsg *sessionData) {
err = message.Decode(s.authKeyId, s.authKey, sessionMsg.buf[8:])
if err != nil {
// TODO(@benqi): close frontend conn??
glog.Error(err)
glog.Errorf("onSessionData - error: {%v}, data: {sess: %s, conn_id: %s, md: %s}", err, s, sessionMsg.connID, sessionMsg.cntl)
// glog.Error(err)
glog.Errorf("onSessionData - error: {%s}, data: {sessions: %s, conn_id: %s, md: %s}", err, s, sessionMsg.connID, sessionMsg.cntl)

return
}

glog.Infof("onSessionData - message: {%s}, data: {sess: %s, conn_id: %s, md: %s}", message, s, sessionMsg.connID, sessionMsg.cntl)

if s.cacheSalt == nil {
s.cacheSalt, s.cacheLastSalt, _ = getOrFetchNewSalt(s.authKeyId)
} else {
Expand All @@ -476,18 +480,12 @@ func (s *authSessions) onSessionData(sessionMsg *sessionData) {
}

if s.cacheSalt == nil {
glog.Errorf("onSessionData - getOrFetchNewSalt nil error, data: {sess: %s, conn_id: %s, md: %s}", s, sessionMsg.connID, sessionMsg.cntl)
glog.Errorf("onSessionData - getOrFetchNewSalt nil error, data: {sessions: %s, conn_id: %s, md: %s}", s, sessionMsg.connID, sessionMsg.cntl)
return
}

sess := s.getOrCreateSession(sessionMsg.connID, message.SessionId, message.Object)
//if sess.SessionType() == kSessionUnknown && !sess.sessionOnline() {
// pushSessionId := s.getPushSessionId() // getCachePushSessionID(s.AuthUserId, s.authKeyId)
// if pushSessionId != 0 && message.SessionId == pushSessionId {
// s.onBindPushSessionId(pushSessionId)
// sess = s.sessions[message.SessionId]
// }
//}
glog.Infof("onSessionData - message: {%s}, data: {sess: %s, sessions: %s, conn_id: %s, md: %s}", message, s, sess, sessionMsg.connID, sessionMsg.cntl)

message2 := &mtproto.TLMessage2{
MsgId: message.MessageId,
Expand Down Expand Up @@ -643,41 +641,5 @@ func (s *authSessions) getOrCreateSession(connId ClientConnID, sessionId int64,
}
}

//if sessType != kSessionUnknown {
// sess2 := newSession(sessionId, sessType, s)
//
// if sess != nil {
// sess2.MergeSession(sess)
// }
//
// if sess2.SessionType() == kSessionGeneric {
// s.updates.onGenericSessionNew(sess2)
// } else if sess2.SessionType() == kSessionPush {
// s.updates.onPushSessionNew(sess2)
// }
//
// s.sessions[sessionId] = sess2
// sess = sess2
//} else {
// pushSessionId := s.getPushSessionId()
//
// if sess == nil {
// if pushSessionId == sessionId {
// sess = newSession(sessionId, kSessionPush, s)
// s.updates.onPushSessionNew(sess)
// } else {
// sess = newSession(sessionId, kSessionUnknown, s)
// }
// // sess.onNewSession(connId, sessionId)
// s.sessions[sessionId] = sess
// } else {
// if pushSessionId == sessionId {
//
// }
// // nothing do
// }
//}
// glog.Info("getSessionType2 - ", reflect.TypeOf(request), ", sessType: ", sessType, ", sess: ", sess)

return sess
}
57 changes: 33 additions & 24 deletions access/session/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,25 +136,28 @@ func (s *SessionServer) OnServerNewConnection(conn *net2.TcpConnection) {
}

func (s *SessionServer) OnServerMessageDataArrived(conn *net2.TcpConnection, cntl *zrpc.ZRpcController, msg proto.Message) error {
glog.Infof("onServerMessageDataArrived - receive data: {peer: %s, cntl: %s, msg: %s}", conn, cntl.RpcMeta, msg)
// glog.Infof("onServerMessageDataArrived - receive data: {peer: %s, cntl: %s, msg: %s}", conn, cntl.RpcMeta, msg)
switch msg.(type) {
case *mtproto.TLSessionClientCreated:
glog.Info("onSessionClientNew - sessionClientNew: ", conn)
// glog.Info("onSessionClientNew - sessionClientNew: ", conn)
return s.onSessionClientNew(conn.GetConnID(), cntl, msg.(*mtproto.TLSessionClientCreated))
case *mtproto.TLSessionMessageData:
return s.onSessionData(conn.GetConnID(), cntl, msg.(*mtproto.TLSessionMessageData))
case *mtproto.TLSessionClientClosed:
glog.Info("onSessionClientClosed - sessionClientClosed: ", conn)
// glog.Info("onSessionClientClosed - sessionClientClosed: ", conn)
return s.onSessionClientClosed(conn.GetConnID(), cntl, msg.(*mtproto.TLSessionClientClosed))
case *mtproto.TLPushConnectToSessionServer:
glog.Infof("onPushConnectToSessionServer - request(ConnectToSessionServerReq): {%v}", msg)
// glog.Infof("onPushConnectToSessionServer - request(ConnectToSessionServerReq): {%v}", msg)
pushSessionServerConnected := &mtproto.TLPushSessionServerConnected{Data2: &mtproto.ServerConnected_Data{
SessionServerId: getServerID(),
ServerName: "session",
}}
serverConnected := pushSessionServerConnected.To_ServerConnected()
cntl.SetMethodName(proto.MessageName(serverConnected))
zrpc.SendMessageByConn(conn, cntl, serverConnected)

cntl2 := cntl.Clone()
cntl2.MoveAttachment()
cntl2.SetMethodName(proto.MessageName(serverConnected))
zrpc.SendMessageByConn(conn, cntl2, serverConnected)
case *mtproto.TLPushPushRpcResultData:
pushData, _ := msg.(*mtproto.TLPushPushRpcResultData)

Expand All @@ -165,11 +168,14 @@ func (s *SessionServer) OnServerMessageDataArrived(conn *net2.TcpConnection, cnt
} else {
mBool = mtproto.ToBool(true)
}
cntl.SetMethodName(proto.MessageName(mBool))
zrpc.SendMessageByConn(conn, cntl, mBool)

cntl2 := cntl.Clone()
cntl2.MoveAttachment()
cntl2.SetMethodName(proto.MessageName(mBool))
zrpc.SendMessageByConn(conn, cntl2, mBool)
case *mtproto.TLPushPushUpdatesData:
pushData, _ := msg.(*mtproto.TLPushPushUpdatesData)
glog.Info("pushData - ", pushData)
// glog.Info("pushData - ", pushData)
// isPush := pushData.GetIsPush() == 1
err := s.onSyncData(pushData.GetAuthKeyId(), pushData.Pts, pushData.PtsCount, cntl)
var mBool *mtproto.Bool
Expand All @@ -178,8 +184,11 @@ func (s *SessionServer) OnServerMessageDataArrived(conn *net2.TcpConnection, cnt
} else {
mBool = mtproto.ToBool(true)
}
cntl.SetMethodName(proto.MessageName(mBool))
zrpc.SendMessageByConn(conn, cntl, mBool)

cntl2 := cntl.Clone()
cntl2.MoveAttachment()
cntl2.SetMethodName(proto.MessageName(mBool))
zrpc.SendMessageByConn(conn, cntl2, mBool)
default:
err := fmt.Errorf("invalid payload type: %v", msg)
glog.Error(err)
Expand Down Expand Up @@ -208,7 +217,7 @@ func (s *SessionServer) OnServerConnectionClosed(conn *net2.TcpConnection) {

////////////////////////////////////////////////////////////////////////////////////////////////////////
func (s *SessionServer) onSessionClientNew(connID uint64, cntl *zrpc.ZRpcController, sessData *mtproto.TLSessionClientCreated) error {
glog.Infof("onSessionClientNew - receive data: {client_conn_id: %s, md: %s, sess_data: %s}", connID, cntl.RpcMeta, sessData)
// glog.Infof("onSessionClientNew - receive data: {client_conn_id: %s, md: %s, sess_data: %s}", connID, cntl.RpcMeta, sessData)

//authKeyId := sessData.GetAuthKeyId()
//var sessList *authSessions
Expand All @@ -221,49 +230,49 @@ func (s *SessionServer) onSessionClientNew(connID uint64, cntl *zrpc.ZRpcControl
// sessList, _ = vv.(*authSessions)
//}
//
//clientConnID := makeClientConnID(int(sessData.GetConnType()), connID, uint64(sessData.GetClientConnId()))
clientConnID := makeClientConnID(int(sessData.GetConnType()), connID, uint64(sessData.GetClientConnId()))
glog.Infof("onSessionClientNew - ID: {conn_id: %s, auth_key_id: %d}", clientConnID, sessData.GetAuthKeyId())
//return sessList.onSessionClientNew(clientConnID)
return nil
}

////////////////////////////////////////////////////////////////////////////////////////////////////////
func (s *SessionServer) onSessionData(connID uint64, cntl *zrpc.ZRpcController, sessData *mtproto.TLSessionMessageData) error {
glog.Infof("onSessionData - receive data: {conn_id: %d, md: %s, sess_data: %s}",
connID,
cntl.RpcMeta,
sessData)
//glog.Infof("onSessionData - receive data: {conn_id: %d, md: %s, sess_data: %s}",
// connID,
// cntl.RpcMeta,
// sessData)
clientConnID := makeClientConnID(int(sessData.GetConnType()), connID, uint64(sessData.GetClientConnId()))

authKeyId := sessData.GetAuthKeyId()
var sessList *authSessions
if vv, ok := s.sessionManager.Load(authKeyId); !ok {
glog.Infof("onSessionDataNew - ID: {conn_id: %s, auth_key_id: %d}", clientConnID, sessData.GetAuthKeyId())

sessList = makeAuthSessions(authKeyId)
s.sessionManager.Store(authKeyId, sessList)
s.onNewSessionClientManager(sessList)
} else {
sessList, _ = vv.(*authSessions)
}

clientConnID := makeClientConnID(int(sessData.GetConnType()), connID, uint64(sessData.GetClientConnId()))
return sessList.onSessionDataArrived(clientConnID, cntl, cntl.MoveAttachment())
}

func (s *SessionServer) onSessionClientClosed(connID uint64, cntl *zrpc.ZRpcController, sessData *mtproto.TLSessionClientClosed) error {
glog.Infof("onSessionClientClosed - receive data: {client_conn_id: %d, md: %s, sess_data: %s}",
connID,
cntl,
sessData)
clientConnID := makeClientConnID(int(sessData.GetConnType()), connID, uint64(sessData.GetClientConnId()))
glog.Infof("onSessionClientClosed - ID: {conn_id: %s, auth_key_id: %d}", clientConnID, sessData.GetAuthKeyId())

var sessList *authSessions

if vv, ok := s.sessionManager.Load(sessData.GetAuthKeyId()); !ok {
err := fmt.Errorf("onSessionClientClosed - not find sessionList by authKeyId: {%d}", sessData.GetAuthKeyId())
err := fmt.Errorf("onSessionClientClosed - not find sessionList by ID: {conn_id: %s, auth_key_id: %d}", clientConnID, sessData.GetAuthKeyId())
glog.Warning(err)
return err
} else {
sessList, _ = vv.(*authSessions)
}

clientConnID := makeClientConnID(int(sessData.GetConnType()), connID, uint64(sessData.GetClientConnId()))
return sessList.onSessionClientClosed(clientConnID)
}

Expand Down
6 changes: 4 additions & 2 deletions access/session/server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,12 @@ func (c *session) processMessageData(id ClientConnID, cntl *zrpc.ZRpcController,

// 1. check salt
if !c.checkBadServerSalt(id, cntl, salt, msg) {
glog.Infof("salt invalid - {sess: %s, conn_id: %s, md: %s}", c, id, cntl)
// glog.Infof("salt invalid - {sess: %s, conn_id: %s, md: %s}", c, id, cntl)
return
}

c.closeDate = time.Now().Unix() + kDefaultPingTimeout + kPingAddTimeout

//if !c.checkBadMsgNotification(id, cntl, msg) {
// glog.Infof("badMsgNotification - {sess: %s, conn_id: %s, md: %s}", c, id, cntl)
// return
Expand Down Expand Up @@ -931,7 +933,7 @@ func (c *session) onPing(connID ClientConnID, cntl *zrpc.ZRpcController, msgId i
}}

c.pendingMessages = append(c.pendingMessages, makePendingMessage(0, false, pong))
c.closeDate = time.Now().Unix() + kDefaultPingTimeout + kPingAddTimeout
// c.closeDate = time.Now().Unix() + kDefaultPingTimeout + kPingAddTimeout
}

func (c *session) onPingDelayDisconnect(connID ClientConnID, cntl *zrpc.ZRpcController, msgId int64, seqNo int32, pingDelayDisconnect *mtproto.TLPingDelayDisconnect) {
Expand Down
10 changes: 8 additions & 2 deletions access/session/server/updates_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ func (m *updatesManager) onGenericSessionNew(s sessionBase) {
m.genericSessions.PushBack(ss)
}

func (m *updatesManager) onGenericSessionClose() {
// m.genericSession = nil
func (m *updatesManager) onGenericSessionClose(s sessionBase) {
ss := s.(*genericSession)
for e := m.genericSessions.Front(); e != nil; e = e.Next() {
if ss.SessionId() == e.Value.(*genericSession).sessionId {
m.genericSessions.Remove(e)
break
}
}
}

func (m *updatesManager) onPushSessionNew(s sessionBase) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2018-present, NebulaChat Studio (https://nebula.chat).
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package messages

import (
"github.com/golang/glog"
"github.com/nebula-chat/chatengine/mtproto"
"github.com/nebula-chat/chatengine/pkg/grpc_util"
"github.com/nebula-chat/chatengine/pkg/logger"
"golang.org/x/net/context"
)

// messages.getRecentLocations#249431e2 peer:InputPeer limit:int = messages.Messages;
func (s *MessagesServiceImpl) MessagesGetRecentLocationsLayer72(ctx context.Context, request *mtproto.TLMessagesGetRecentLocationsLayer72) (*mtproto.Messages_Messages, error) {
md := grpc_util.RpcMetadataFromIncoming(ctx)
glog.Infof("messages.getRecentLocations#249431e2 - metadata: %s, request: %s", logger.JsonDebugData(md), logger.JsonDebugData(request))

// TODO(@benqi): Impl MessagesSearchGlobal logic
messages := &mtproto.TLMessagesMessages{Data2: &mtproto.Messages_Messages_Data{
Messages: []*mtproto.Message{},
Chats: []*mtproto.Chat{},
Users: []*mtproto.User{},
}}

glog.Infof("messages.getRecentLocations#249431e2 - reply: %s", logger.JsonDebugData(messages))
return messages.To_Messages_Messages(), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package messages

import (
"github.com/golang/glog"
"github.com/nebula-chat/chatengine/mtproto"
"github.com/nebula-chat/chatengine/pkg/grpc_util"
"github.com/nebula-chat/chatengine/pkg/logger"
"github.com/nebula-chat/chatengine/mtproto"
"golang.org/x/net/context"
)

// messages.getRecentLocations#249431e2 peer:InputPeer limit:int = messages.Messages;
// messages.getRecentLocations#bbc45b09 peer:InputPeer limit:int hash:int = messages.Messages;
func (s *MessagesServiceImpl) MessagesGetRecentLocations(ctx context.Context, request *mtproto.TLMessagesGetRecentLocations) (*mtproto.Messages_Messages, error) {
md := grpc_util.RpcMetadataFromIncoming(ctx)
glog.Infof("messages.getRecentLocations#249431e2 - metadata: %s, request: %s", logger.JsonDebugData(md), logger.JsonDebugData(request))
glog.Infof("messages.getRecentLocations#bbc45b09 - metadata: %s, request: %s", logger.JsonDebugData(md), logger.JsonDebugData(request))

// TODO(@benqi): Impl MessagesSearchGlobal logic
messages := &mtproto.TLMessagesMessages{Data2: &mtproto.Messages_Messages_Data{
Expand All @@ -37,6 +37,6 @@ func (s *MessagesServiceImpl) MessagesGetRecentLocations(ctx context.Context, re
Users: []*mtproto.User{},
}}

glog.Infof("messages.getRecentLocations#249431e2 - reply: %s", logger.JsonDebugData(messages))
glog.Infof("messages.getRecentLocations#bbc45b09 - reply: %s", logger.JsonDebugData(messages))
return messages.To_Messages_Messages(), nil
}
Loading

0 comments on commit 265aa70

Please sign in to comment.