xrootd
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21 
22 #include "XrdCl/XrdClMessage.hh"
25 #include "XrdCl/XrdClSocket.hh"
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClStream.hh"
28 
29 #include <memory>
30 
31 namespace XrdCl
32 {
33  //----------------------------------------------------------------------------
35  //----------------------------------------------------------------------------
37  {
38  public:
39  //------------------------------------------------------------------------
47  //------------------------------------------------------------------------
49  Socket &socket,
50  const std::string &strmname,
51  Stream &strm,
52  uint16_t substrmnb) : readstage( ReadStart ),
54  socket( socket ),
55  strmname( strmname ),
56  strm( strm ),
58  inmsgsize( 0 ),
59  inhandler( nullptr )
60  {
61  }
62 
63  //------------------------------------------------------------------------
65  //------------------------------------------------------------------------
66  virtual ~AsyncMsgReader(){ }
67 
68  //------------------------------------------------------------------------
70  //------------------------------------------------------------------------
71  inline void Reset()
72  {
74  inmsg.reset();
75  inmsgsize = 0;
76  inhandler = nullptr;
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  {
84  Log *log = DefaultEnv::GetLog();
85 
86  while( true )
87  {
88  switch( readstage )
89  {
90  //------------------------------------------------------------------
91  // There is no incoming message currently being processed so we
92  // create a new one
93  //------------------------------------------------------------------
94  case ReadStart:
95  {
96  inmsg = std::make_shared<Message>();
97  //----------------------------------------------------------------
98  // The next step is to read the header
99  //----------------------------------------------------------------
101  continue;
102  }
103  //------------------------------------------------------------------
104  // We need to read the header
105  //------------------------------------------------------------------
106  case ReadHeader:
107  {
109  if( !st.IsOK() || st.code == suRetry )
110  return st;
111 
112  log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
113  strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114 
115  ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116  if( rsp->hdr.status == kXR_attn )
117  {
118  log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119  "of message 0x%x", strmname.c_str(), inmsg.get() );
120  inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
122  continue;
123  }
124 
125  inmsgsize = inmsg->GetCursor();
127 
128  if( inhandler )
129  {
130  log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131  "of message 0x%x", strmname.c_str(), inmsg.get() );
132  //--------------------------------------------------------------
133  // The next step is to read raw data
134  //--------------------------------------------------------------
136  continue;
137  }
138  //----------------------------------------------------------------
139  // The next step is to read the message body
140  //----------------------------------------------------------------
142  continue;
143  }
144  //------------------------------------------------------------------
145  // Before proceeding we need to figure out the attn action code
146  //------------------------------------------------------------------
147  case ReadAttn:
148  {
150  if( !st.IsOK() || st.code == suRetry )
151  return st;
152 
153  //----------------------------------------------------------------
154  // There is an embedded response, overwrite the message with that
155  //----------------------------------------------------------------
156  if( HasEmbeddedRsp() )
157  {
158  inmsg->Free();
160  continue;
161  }
162 
163  //----------------------------------------------------------------
164  // Readout the rest of the body
165  //----------------------------------------------------------------
166  inmsgsize = inmsg->GetCursor();
168  continue;
169  }
170  //------------------------------------------------------------------
171  // We need to call a raw message handler to get the data from the
172  // socket
173  //------------------------------------------------------------------
174  case ReadRawData:
175  {
176  uint32_t bytesRead = 0;
177  XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
178  if( !st.IsOK() )
179  return st;
180  inmsgsize += bytesRead;
181  if( st.code == suRetry )
182  return st;
183  //----------------------------------------------------------------
184  // The next step is to finalize the read
185  //----------------------------------------------------------------
187  continue;
188  }
189  //------------------------------------------------------------------
190  // No raw handler, so we read the message to the buffer
191  //------------------------------------------------------------------
192  case ReadMsgBody:
193  {
195  if( !st.IsOK() || st.code == suRetry )
196  return st;
197  inmsgsize = inmsg->GetCursor();
198 
199  //----------------------------------------------------------------
200  // Now check if there are some additional raw data to be read
201  //----------------------------------------------------------------
202  if( !inhandler )
203  {
204  uint16_t action = strm.InspectStatusRsp( substrmnb,
205  inhandler );
206 
207  if( action & MsgHandler::Corrupted )
209 
210  if( action & MsgHandler::Raw )
211  {
212  //------------------------------------------------------------
213  // The next step is to read the raw data
214  //------------------------------------------------------------
216  continue;
217  }
218 
219  if( action & MsgHandler::More )
220  {
221  //------------------------------------------------------------
222  // The next step is to read the additional data in the message
223  // body
224  //------------------------------------------------------------
226  continue;
227  }
228  }
229  //------------------------------------------------------------
230  // The next step is to finalize the read
231  //------------------------------------------------------------
233  continue;
234  }
235 
236  case ReadDone:
237  {
238  //----------------------------------------------------------------
239  // Report the incoming message
240  //----------------------------------------------------------------
241  log->Dump( AsyncSockMsg, "[%s] Received message 0x%x of %d bytes",
242  strmname.c_str(), inmsg.get(), inmsgsize );
243 
244  strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
245  }
246  }
247  // just in case
248  break;
249  }
250 
251  //----------------------------------------------------------------------
252  // We are done
253  //----------------------------------------------------------------------
254  return XRootDStatus();
255  }
256 
257  private:
258 
260  {
261  //----------------------------------------------------------------------
262  // Readout the action code from the socket. We are reading out 8 bytes
263  // into the message, the 8 byte header is already there.
264  //----------------------------------------------------------------------
265  size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
266  while( btsleft > 0 )
267  {
268  int btsrd = 0;
269  XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
270  if( !st.IsOK() || st.code == suRetry )
271  return st;
272  btsleft -= btsrd;
273  inmsg->AdvanceCursor( btsrd );
274  }
275 
276  //----------------------------------------------------------------------
277  // Marshal the action code
278  //----------------------------------------------------------------------
279  ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
280  attn->actnum = ntohl( attn->actnum );
281 
282  return XRootDStatus();
283  }
284 
285  inline bool HasEmbeddedRsp()
286  {
287  ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
288  return ( attn->actnum == kXR_asynresp );
289  }
290 
291  //------------------------------------------------------------------------
293  //------------------------------------------------------------------------
294  enum Stage
295  {
296  ReadStart, //< the next step is to initialize the read
297  ReadHeader, //< the next step is to read the header
298  ReadAttn, //< the next step is to read attn action code
299  ReadMsgBody, //< the next step is to read the body
300  ReadRawData, //< the next step is to read the raw data
301  ReadDone //< the next step is to finalize the read
302  };
303 
304  //------------------------------------------------------------------------
305  // Current read stage
306  //------------------------------------------------------------------------
308 
309  //------------------------------------------------------------------------
310  // The context of the read operation
311  //------------------------------------------------------------------------
314  const std::string &strmname;
316  uint16_t substrmnb;
317 
318 
319  //------------------------------------------------------------------------
320  // The internal state of the the reader
321  //------------------------------------------------------------------------
322  std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
323  uint32_t inmsgsize;
325 
326  };
327 
328 } /* namespace XrdCl */
329 
330 #endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
@ kXR_asynresp
Definition: XProtocol.hh:931
@ kXR_attn
Definition: XProtocol.hh:894
Utility class encapsulating reading response message logic.
Definition: XrdClAsyncMsgReader.hh:37
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition: XrdClAsyncMsgReader.hh:71
Socket & socket
Definition: XrdClAsyncMsgReader.hh:313
TransportHandler & xrdTransport
Definition: XrdClAsyncMsgReader.hh:312
const std::string & strmname
Definition: XrdClAsyncMsgReader.hh:314
std::shared_ptr< Message > inmsg
Definition: XrdClAsyncMsgReader.hh:322
uint32_t inmsgsize
Definition: XrdClAsyncMsgReader.hh:323
Stage readstage
Definition: XrdClAsyncMsgReader.hh:307
Stage
Stages of reading out a response from the socket.
Definition: XrdClAsyncMsgReader.hh:295
@ ReadAttn
Definition: XrdClAsyncMsgReader.hh:298
@ ReadRawData
Definition: XrdClAsyncMsgReader.hh:300
@ ReadMsgBody
Definition: XrdClAsyncMsgReader.hh:299
@ ReadHeader
Definition: XrdClAsyncMsgReader.hh:297
@ ReadStart
Definition: XrdClAsyncMsgReader.hh:296
@ ReadDone
Definition: XrdClAsyncMsgReader.hh:301
bool HasEmbeddedRsp()
Definition: XrdClAsyncMsgReader.hh:285
uint16_t substrmnb
Definition: XrdClAsyncMsgReader.hh:316
XRootDStatus Read()
Read out the response from the socket.
Definition: XrdClAsyncMsgReader.hh:82
virtual ~AsyncMsgReader()
Destructor.
Definition: XrdClAsyncMsgReader.hh:66
MsgHandler * inhandler
Definition: XrdClAsyncMsgReader.hh:324
XRootDStatus ReadAttnActnum()
Definition: XrdClAsyncMsgReader.hh:259
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
Definition: XrdClAsyncMsgReader.hh:48
Stream & strm
Definition: XrdClAsyncMsgReader.hh:315
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Message handler.
Definition: XrdClPostMasterInterfaces.hh:51
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
Definition: XrdClPostMasterInterfaces.hh:138
@ Raw
Definition: XrdClPostMasterInterfaces.hh:63
@ More
there are more (non-raw) data to be read
Definition: XrdClPostMasterInterfaces.hh:72
@ Corrupted
Definition: XrdClPostMasterInterfaces.hh:69
A network socket.
Definition: XrdClSocket.hh:43
XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
Stream.
Definition: XrdClStream.hh:50
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:290
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
Request status.
Definition: XrdClXRootDResponses.hh:219
Definition: XrdClAnyObject.hh:26
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:41
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
Definition: XProtocol.hh:934
kXR_int32 actnum
Definition: XProtocol.hh:935
kXR_unt16 status
Definition: XProtocol.hh:908
Definition: XProtocol.hh:1277
ServerResponseHeader hdr
Definition: XProtocol.hh:1278
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:123