【B2B研发商城】 【加入收藏】 【设为首页】 【进入论坛】 【站点地图】

你的位置:中国研发网 >> 专题频道 >> 流媒体开发 >> 详细内容 在线投稿

流媒体服务器通信部分的核心代码实现。

热度23票  浏览68次 【共0条评论】【我要评论 时间:2010年8月05日 12:28
本篇文章将介绍服务器通信部分的核心代码实现。

    首先从整体来看,服务器在启动时要创建套接字并开始不断地进行侦听,一旦有新的客户端连接,将会创建一个客户端会话实例并通过该会话实例管理自己的套接字,同时采用异步方式来实现数据的接收与发送。在这里需要说明的是,为了使各自的职责更清晰,笔者特别对每一个客户端采用两个套接字,一个套接字专门用于与服务器的普通通信(如各种消息的收发等),另一个套接字专门用于发送或接收流数据。当然,也许这样设计可能会存在一定的缺陷,但姑且先这样去实现,若有可能,待日后有更好的思路时再做进一步改进。

    根据上面所述的基本思路,要编码实现服务器的通信部分,需要完成以下工作:

    1、新建一个RtspServer类来负责服务器的启动、停止、同时维护客户端会话列表等;

    2、新建一个ServerSocket类来创建服务器套接字、侦听连接请求、关闭服务器套接字和客户端套接字等;

    3、新建一个ClientSession类来负责处理每一个客户端会话的请求等,该类继承于ClientSessionInfo类(专门用于维护客户端会话基本信息的类);

    4、新建一个ClientSocket类来进行异步接收和发送数据;

    5、采用事件来处理RtspServer与UI的交互、ServerSocket与RtspServer的交互、ClientSession与RtspServer的交互、ClientSocket与ClientSession的交互等。

    6、RtspCommon类用来处理RTSP的请求数据(如解析URL等),Utils为公共使用的工具类,Constants为常量类。

    根据上面所述,初步的类关系图如下图所示:


    除了主线程之外,使用了线程池来管理侦听和定时检查每一个客户端会话的状态的工作。这两个任务在启动服务器时加入线程池,代码示例如下:
view plaincopy to clipboardprint?
/// <summary>   
/// Starts the server.   
/// </summary>   
/// <returns>success or failed</returns>   
public bool StartServer()   
{   
    if (!m_IsServerClosed)   
    {   
        return true;   
    }   
  
    if (m_ServerSocket == null)   
    {   
        return false;   
    }   
  
    m_IsServerClosed = true;   
  
    this.ClearCountValues();   
  
    try  
    {   
        if (!m_ServerSocket.CreateServerSocket())   
        {   
            return false;   
        }   
  
        if (!ThreadPool.QueueUserWorkItem(this.StartServerListening))   
        {   
            return false;   
        }   
  
        if (!ThreadPool.QueueUserWorkItem(this.CheckClientSession))   
        {   
            return false;   
        }   
  
        m_IsServerClosed = false;   
  
        this.OnServerStarted();   
    }   
    catch (System.Exception e)   
    {   
        this.OnExceptionOccurred(e);   
        throw;   
    }   
  
    return !m_IsServerClosed;   
}  
        /// <summary>
        /// Starts the server.
        /// </summary>
        /// <returns>success or failed</returns>
        public bool StartServer()
        {
            if (!m_IsServerClosed)
            {
                return true;
            }

            if (m_ServerSocket == null)
            {
                return false;
            }

            m_IsServerClosed = true;

            this.ClearCountValues();

            try
            {
                if (!m_ServerSocket.CreateServerSocket())
                {
                    return false;
                }

                if (!ThreadPool.QueueUserWorkItem(this.StartServerListening))
                {
                    return false;
                }

                if (!ThreadPool.QueueUserWorkItem(this.CheckClientSession))
                {
                    return false;
                }

                m_IsServerClosed = false;

                this.OnServerStarted();
            }
            catch (System.Exception e)
            {
                this.OnExceptionOccurred(e);
                throw;
            }

            return !m_IsServerClosed;
        } 

view plaincopy to clipboardprint?
/// <summary>   
/// Starts the server listening, this is a loop until the current server is closed.   
/// </summary>   
/// <param name="state"></param>   
private void StartServerListening(object state)   
{   
    m_CheckSocketAcceptResetEvent.Reset();   
  
    Socket clientSocket = null;   
    while (!m_IsServerClosed)   
    {   
        if (m_ServerSocket != null)   
        {   
            m_ServerSocket.StartListening(clientSocket);   
        }   
    }   
  
    m_CheckSocketAcceptResetEvent.Set();   
}   
  
/// <summary>   
/// Checks the rtsp client session.   
/// In this method, we do lost of things such as check the client session   
/// is time out or not, check the client session is tear down or not.   
/// </summary>   
/// <param name="state"></param>   
private void CheckClientSession(object state)   
{   
    m_CheckClientSessionResetEvent.Reset();   
  
    while (!m_IsServerClosed)   
    {   
        lock (m_ClientSessionTable)   
        {   
            List<int> clientSessionIdList = new List<int>();   
  
            foreach (ClientSession session in m_ClientSessionTable.Values)   
            {   
                if (m_IsServerClosed)   
                {   
                    break;   
                }   
  
                if (session.SessionState == ClientSessionState.Teardown)   
                {   
                    m_ServerSocket.CloseClientSocket(session.Socket);   
                    clientSessionIdList.Add(session.ClientSessionId);   
                    this.OnClientSessionDisconnected(session);   
                }   
                else  
                {   
                    if (session.CheckTimeout(m_MaxSessionTimeout))   
                    {   
                        this.OnClientSessionTimeout(session);   
                    }   
                }   
            }   
  
            foreach (int id in clientSessionIdList)   
            {   
                m_ClientSessionTable.Remove(id);   
            }   
  
            clientSessionIdList.Clear();   
        }   
  
        // Sleep the current thread, and then continue to check the client session.   
        Thread.Sleep(100);   
    }   
  
    m_CheckClientSessionResetEvent.Set();   
}  
        /// <summary>
        /// Starts the server listening, this is a loop until the current server is closed.
        /// </summary>
        /// <param name="state"></param>
        private void StartServerListening(object state)
        {
            m_CheckSocketAcceptResetEvent.Reset();

            Socket clientSocket = null;
            while (!m_IsServerClosed)
            {
                if (m_ServerSocket != null)
                {
                    m_ServerSocket.StartListening(clientSocket);
                }
            }

            m_CheckSocketAcceptResetEvent.Set();
        }

        /// <summary>
        /// Checks the rtsp client session.
        /// In this method, we do lost of things such as check the client session
        /// is time out or not, check the client session is tear down or not.
        /// </summary>
        /// <param name="state"></param>
        private void CheckClientSession(object state)
        {
            m_CheckClientSessionResetEvent.Reset();

            while (!m_IsServerClosed)
            {
                lock (m_ClientSessionTable)
                {
                    List<int> clientSessionIdList = new List<int>();

                    foreach (ClientSession session in m_ClientSessionTable.Values)
                    {
                        if (m_IsServerClosed)
                        {
                            break;
                        }

                        if (session.SessionState == ClientSessionState.Teardown)
                        {
                            m_ServerSocket.CloseClientSocket(session.Socket);
                            clientSessionIdList.Add(session.ClientSessionId);
                            this.OnClientSessionDisconnected(session);
                        }
                        else
                        {
                            if (session.CheckTimeout(m_MaxSessionTimeout))
                            {
                                this.OnClientSessionTimeout(session);
                            }
                        }
                    }

                    foreach (int id in clientSessionIdList)
                    {
                        m_ClientSessionTable.Remove(id);
                    }

                    clientSessionIdList.Clear();
                }

                // Sleep the current thread, and then continue to check the client session.
                Thread.Sleep(100);
            }

            m_CheckClientSessionResetEvent.Set();
        } 

    上述代码中,m_ServerSocket为侦听类的一个对象,m_ServerSocket.CreateServerSocket()则创建了一个基于TCP协议的套接字,通过while循环来进行不断侦听,侦听的代码片段如下:
view plaincopy to clipboardprint?
/// <summary>   
/// Starts the listening.   
/// </summary>   
/// <param name="clientSocket">The client socket(null).</param>   
public void StartListening(Socket clientSocket)   
{   
    if (m_Socket == null)   
    {   
        this.CreateServerSocket();   
    }   
  
    try  
    {   
        if (m_Socket.Poll(m_AcceptListenTimeInterval, SelectMode.SelectRead))   
        {   
            clientSocket = m_Socket.Accept();   
            if (clientSocket != null && clientSocket.Connected)   
            {   
                this.OnClientSocketAccepted(clientSocket);   
            }   
            else  
            {   
                // The client socket is null or it was not connected, so we   
                // try to close it.   
                this.CloseClientSocket(clientSocket);   
            }   
        }   
    }   
    catch (System.Exception e)   
    {   
        this.CloseClientSocket(clientSocket);   
        this.OnExceptionOccurred(e);   
        throw;   
    }   
}  
        /// <summary>
        /// Starts the listening.
        /// </summary>
        /// <param name="clientSocket">The client socket(null).</param>
        public void StartListening(Socket clientSocket)
        {
            if (m_Socket == null)
            {
                this.CreateServerSocket();
            }

            try
            {
                if (m_Socket.Poll(m_AcceptListenTimeInterval, SelectMode.SelectRead))
                {
                    clientSocket = m_Socket.Accept();
                    if (clientSocket != null && clientSocket.Connected)
                    {
                        this.OnClientSocketAccepted(clientSocket);
                    }
                    else
                    {
                        // The client socket is null or it was not connected, so we
                        // try to close it.
                        this.CloseClientSocket(clientSocket);
                    }
                }
            }
            catch (System.Exception e)
            {
                this.CloseClientSocket(clientSocket);
                this.OnExceptionOccurred(e);
                throw;
            }
        } 

    每当有新的连接进来时,通过OnClientSocketAccepted(Socket socket)事件将socket带回到RtspServer类中,并在RtspServer类中做相应的处理,代码片段如下:
view plaincopy to clipboardprint?
/// <summary>   
/// Called when [client socket accepted].   
/// </summary>   
/// <param name="sender">The sender.</param>   
/// <param name="e">The <see cref="Simon.SharpStreamingServer.Core.TSocketEventArgs"/> instance containing the event data.</param>   
protected virtual void OnClientSocketAccepted(object sender, TSocketEventArgs e)   
{   
    // Is the current connection count greater than the maximum   
    // connection count? If so, we close the client socket, or   
    // else, we add the client session as a new client session.   
    if (m_CurConnectionCount > m_MaxConnectionCount)   
    {   
        if (m_ServerSocket != null)   
        {   
            m_ServerSocket.CloseClientSocket(e.ClientSocket);   
            this.OnClientSessionRejected();   
        }   
    }   
    else  
    {   
        this.AddClientSession(e.ClientSocket);   
    }   
}  
        /// <summary>
        /// Called when [client socket accepted].
        /// </summary>
        /// <param name="sender">The sender.</param>
        /// <param name="e">The <see cref="Simon.SharpStreamingServer.Core.TSocketEventArgs"/> instance containing the event data.</param>
        protected virtual void OnClientSocketAccepted(object sender, TSocketEventArgs e)
        {
            // Is the current connection count greater than the maximum
            // connection count? If so, we close the client socket, or
            // else, we add the client session as a new client session.
            if (m_CurConnectionCount > m_MaxConnectionCount)
            {
                if (m_ServerSocket != null)
                {
                    m_ServerSocket.CloseClientSocket(e.ClientSocket);
                    this.OnClientSessionRejected();
                }
            }
            else
            {
                this.AddClientSession(e.ClientSocket);
            }
        } 

    在上述这个方法中,首先判断连接数,如果当前连接数大于最大允许的连接数,则拒绝连接请求,否则将新增客户端会话(创建新的客户端会话对象)。

    再回过头来看CheckClientSession(object state)方法,该方法同样是通过一个while循环来不断地检查每一个客户端会话的状态,如果状态为Teardown的,则关闭其socket并最终将其从会话列表中移除,同时亦会检查每一个客户端会话是否超时(如果超时,则将其状态标记为Teardown)。m_ClientSessionTable用于维护客户端会话列表,为Dictionary<int, ClientSession>类型。

    此外,使用ManualResetEvent对象来管理多个线程的同步,这主要是在停止服务器时发挥作用。停止服务器的代码片段如下:
view plaincopy to clipboardprint?
/// <summary>   
/// Stops the server.   
/// </summary>   
/// <returns>success or failed</returns>   
public bool StopServer()   
{   
    if (m_IsServerClosed)   
    {   
        return true;   
    }   
  
    m_IsServerClosed = true;   
  
    // First, waits for two threads.   
    m_CheckSocketAcceptResetEvent.WaitOne();   
    m_CheckClientSessionResetEvent.WaitOne();   
  
    // And then, closes each client socket.   
    if (m_ClientSessionTable != null && m_ServerSocket != null)   
    {   
        lock (m_ClientSessionTable)   
        {   
            foreach (ClientSession session in m_ClientSessionTable.Values)   
            {   
                m_ServerSocket.CloseClientSocket(session.Socket);   
            }   
        }   
    }   
  
    // Then, closes the server socket.   
    if (m_ServerSocket != null)   
    {   
        m_ServerSocket.CloseServerSocket();   
    }   
  
    // At last, clears the client session table.   
    if (m_ClientSessionTable != null)   
    {   
        lock (m_ClientSessionTable)   
        {   
            m_ClientSessionTable.Clear();   
        }   
    }   
  
    this.OnServerStopped();   
  
    return true;   
}  
        /// <summary>
        /// Stops the server.
        /// </summary>
        /// <returns>success or failed</returns>
        public bool StopServer()
        {
            if (m_IsServerClosed)
            {
                return true;
            }

            m_IsServerClosed = true;

            // First, waits for two threads.
            m_CheckSocketAcceptResetEvent.WaitOne();
            m_CheckClientSessionResetEvent.WaitOne();

            // And then, closes each client socket.
            if (m_ClientSessionTable != null && m_ServerSocket != null)
            {
                lock (m_ClientSessionTable)
                {
                    foreach (ClientSession session in m_ClientSessionTable.Values)
                    {
                        m_ServerSocket.CloseClientSocket(session.Socket);
                    }
                }
            }

            // Then, closes the server socket.
            if (m_ServerSocket != null)
            {
                m_ServerSocket.CloseServerSocket();
            }

            // At last, clears the client session table.
            if (m_ClientSessionTable != null)
            {
                lock (m_ClientSessionTable)
                {
                    m_ClientSessionTable.Clear();
                }
            }

            this.OnServerStopped();

            return true;
        } 

    每当一个新客户端会话对象被创建后,即开始异步接收数据。代码片段如下:
view plaincopy to clipboardprint?
/// <summary>   
/// Adds a new client session   
/// </summary>   
/// <param name="socket">client socket</param>   
private void AddClientSession(Socket socket)   
{   
    Interlocked.Increment(ref m_ClientSessionSeqNo);   
  
    ClientSession clientSession = new ClientSession(m_ClientSessionSeqNo, socket);   
    clientSession.ExceptionOccurred += new EventHandler<TExceptionEventArgs>(this.OnExceptionOccurred);   
  
    lock(m_ClientSessionTable)   
    {   
        m_ClientSessionTable.Add(m_ClientSessionSeqNo, clientSession);   
    }   
  
    // Starts handling the rtsp client request.   
    clientSession.ReceiveClientRequest();   
  
    this.OnClientSessionConnected(clientSession);   
}  
        /// <summary>
        /// Adds a new client session
        /// </summary>
        /// <param name="socket">client socket</param>
        private void AddClientSession(Socket socket)
        {
            Interlocked.Increment(ref m_ClientSessionSeqNo);

            ClientSession clientSession = new ClientSession(m_ClientSessionSeqNo, socket);
            clientSession.ExceptionOccurred += new EventHandler<TExceptionEventArgs>(this.OnExceptionOccurred);

            lock(m_ClientSessionTable)
            {
                m_ClientSessionTable.Add(m_ClientSessionSeqNo, clientSession);
            }

            // Starts handling the rtsp client request.
            clientSession.ReceiveClientRequest();

            this.OnClientSessionConnected(clientSession);
        } 

    ReceiveClientRequest()的主要工作就是调用ClientSocket的ReceiveData()方法开始进行数据的异步接收。每当ClientSocket接收到数据时,通过OnDatagramReceived(byte[] revBuffer, int revBufferSize)事件将数据传回给对应的ClientSession对象,并由ClientSession做相应的处理。 

    综上所述的是服务器通信部分的核心实现思路及代码片段,鉴于篇幅关系,在这里将不再贴出全部的代码。


本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/huangxinfeng/archive/2010/07/21/5753798.aspx
本站所有文章欢迎任何形式的转载,但请注明作者及出处,尊重他人劳动成果!
文章转载自:中国研发网 [http://www.yanfaw.com]
本文标题:流媒体服务器通信部分的核心代码实现。
TAG: 代码 服务器 核心 流媒体 通信
顶:1 踩:1
对本文中的事件或人物打分:
当前平均分:0.5 (4次打分)
对本篇资讯内容的质量打分:
当前平均分:-2 (4次打分)
【已经有13人表态】
3票
感动
1票
路过
2票
高兴
1票
难过
1票
搞笑
2票
愤怒
2票
无聊
1票
同情
上一篇 下一篇
发表评论

网友评论仅供网友表达个人看法,并不表明本网同意其观点或证实其描述。

查看全部回复【已有0位网友发表了看法】