xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21 
22 #include "XrdCl/XrdClMessage.hh"
25 #include "XrdCl/XrdClSocket.hh"
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClStream.hh"
28 
29 #include <memory>
30 
31 namespace XrdCl
32 {
33  //----------------------------------------------------------------------------
35  //----------------------------------------------------------------------------
37  {
38  public:
39  //------------------------------------------------------------------------
47  //------------------------------------------------------------------------
49  Socket &socket,
50  const std::string &strmname,
51  Stream &strm,
52  uint16_t substrmnb) : readstage( ReadStart ),
53  xrdTransport( xrdTransport ),
54  socket( socket ),
55  strmname( strmname ),
56  strm( strm ),
57  substrmnb( substrmnb ),
58  inmsgsize( 0 ),
59  inhandler( nullptr )
60  {
61  }
62 
63  //------------------------------------------------------------------------
65  //------------------------------------------------------------------------
66  virtual ~AsyncMsgReader(){ }
67 
68  //------------------------------------------------------------------------
70  //------------------------------------------------------------------------
71  inline void Reset()
72  {
74  inmsg.reset();
75  inmsgsize = 0;
76  inhandler = nullptr;
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  {
84  Log *log = DefaultEnv::GetLog();
85 
86  while( true )
87  {
88  switch( readstage )
89  {
90  //------------------------------------------------------------------
91  // There is no incoming message currently being processed so we
92  // create a new one
93  //------------------------------------------------------------------
94  case ReadStart:
95  {
96  inmsg = std::make_shared<Message>();
97  //----------------------------------------------------------------
98  // The next step is to read the header
99  //----------------------------------------------------------------
101  continue;
102  }
103  //------------------------------------------------------------------
104  // We need to read the header
105  //------------------------------------------------------------------
106  case ReadHeader:
107  {
109  if( !st.IsOK() || st.code == suRetry )
110  return st;
111 
112  log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
113  strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114 
115  ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116  if( rsp->hdr.status == kXR_attn )
117  {
118  log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119  "of message 0x%x", strmname.c_str(), inmsg.get() );
120  inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
122  continue;
123  }
124 
125  inmsgsize = inmsg->GetCursor();
127 
128  if( inhandler )
129  {
130  log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131  "of message 0x%x", strmname.c_str(), inmsg.get() );
132  //--------------------------------------------------------------
133  // The next step is to read raw data
134  //--------------------------------------------------------------
136  continue;
137  }
138 
139  //----------------------------------------------------------------
140  // The next step is to read the message body
141  //----------------------------------------------------------------
143  continue;
144  }
145  //------------------------------------------------------------------
146  // Before proceeding we need to figure out the attn action code
147  //------------------------------------------------------------------
148  case ReadAttn:
149  {
151  if( !st.IsOK() || st.code == suRetry )
152  return st;
153 
154  //----------------------------------------------------------------
155  // There is an embedded response, overwrite the message with that
156  //----------------------------------------------------------------
157  if( HasEmbeddedRsp() )
158  {
159  inmsg->Free();
161  continue;
162  }
163 
164  //----------------------------------------------------------------
165  // Readout the rest of the body
166  //----------------------------------------------------------------
167  inmsgsize = inmsg->GetCursor();
169  continue;
170  }
171  //------------------------------------------------------------------
172  // kXR_status is special as it can have both body and raw data,
173  // handle it separately
174  //------------------------------------------------------------------
175  case ReadMore:
176  {
178  if( !st.IsOK() || st.code == suRetry )
179  return st;
180  inmsgsize = inmsg->GetCursor();
181 
182  //----------------------------------------------------------------
183  // The next step is to finalize the read
184  //----------------------------------------------------------------
186  continue;
187  }
188  //------------------------------------------------------------------
189  // We need to call a raw message handler to get the data from the
190  // socket
191  //------------------------------------------------------------------
192  case ReadRawData:
193  {
194  uint32_t bytesRead = 0;
195  XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
196  if( !st.IsOK() )
197  return st;
198  inmsgsize += bytesRead;
199  if( st.code == suRetry )
200  return st;
201  //----------------------------------------------------------------
202  // The next step is to finalize the read
203  //----------------------------------------------------------------
205  continue;
206  }
207  //------------------------------------------------------------------
208  // No raw handler, so we read the message to the buffer
209  //------------------------------------------------------------------
210  case ReadMsgBody:
211  {
213  if( !st.IsOK() || st.code == suRetry )
214  return st;
215  inmsgsize = inmsg->GetCursor();
216 
217 
218  //----------------------------------------------------------------
219  // kXR_status response needs special handling as it can have
220  // either (body + raw data) or (body + additional body data)
221  //----------------------------------------------------------------
222  if( IsStatusRsp() )
223  {
224  uint16_t action = strm.InspectStatusRsp( substrmnb,
225  inhandler );
226 
227  if( action & MsgHandler::Corrupted )
229 
230  if( action & MsgHandler::Raw )
231  {
232  //--------------------------------------------------------------
233  // The next step is to read the raw data
234  //--------------------------------------------------------------
236  continue;
237  }
238 
239  if( action & MsgHandler::More )
240  {
241 
242  //--------------------------------------------------------------
243  // The next step is to read the additional data in the message
244  // body
245  //--------------------------------------------------------------
247  continue;
248  }
249  }
250 
251  //----------------------------------------------------------------
252  // The next step is to finalize the read
253  //----------------------------------------------------------------
255  continue;
256  }
257 
258  case ReadDone:
259  {
260  //----------------------------------------------------------------
261  // Report the incoming message
262  //----------------------------------------------------------------
263  log->Dump( AsyncSockMsg, "[%s] Received message 0x%x of %d bytes",
264  strmname.c_str(), inmsg.get(), inmsgsize );
265 
266  strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
267  }
268  }
269  // just in case
270  break;
271  }
272 
273  //----------------------------------------------------------------------
274  // We are done
275  //----------------------------------------------------------------------
276  return XRootDStatus();
277  }
278 
279  private:
280 
282  {
283  //----------------------------------------------------------------------
284  // Readout the action code from the socket. We are reading out 8 bytes
285  // into the message, the 8 byte header is already there.
286  //----------------------------------------------------------------------
287  size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
288  while( btsleft > 0 )
289  {
290  int btsrd = 0;
291  XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
292  if( !st.IsOK() || st.code == suRetry )
293  return st;
294  btsleft -= btsrd;
295  inmsg->AdvanceCursor( btsrd );
296  }
297 
298  //----------------------------------------------------------------------
299  // Marshal the action code
300  //----------------------------------------------------------------------
301  ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
302  attn->actnum = ntohl( attn->actnum );
303 
304  return XRootDStatus();
305  }
306 
307  inline bool IsStatusRsp()
308  {
309  ServerResponseHeader *hdr = (ServerResponseHeader*)inmsg->GetBuffer();
310  return ( hdr->status == kXR_status );
311  }
312 
313  inline bool HasEmbeddedRsp()
314  {
315  ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
316  return ( attn->actnum == kXR_asynresp );
317  }
318 
319  //------------------------------------------------------------------------
321  //------------------------------------------------------------------------
322  enum Stage
323  {
324  ReadStart, //< the next step is to initialize the read
325  ReadHeader, //< the next step is to read the header
326  ReadAttn, //< the next step is to read attn action code
327  ReadMore, //< the next step is to read more status body
328  ReadMsgBody, //< the next step is to read the body
329  ReadRawData, //< the next step is to read the raw data
330  ReadDone //< the next step is to finalize the read
331  };
332 
333  //------------------------------------------------------------------------
334  // Current read stage
335  //------------------------------------------------------------------------
337 
338  //------------------------------------------------------------------------
339  // The context of the read operation
340  //------------------------------------------------------------------------
343  const std::string &strmname;
345  uint16_t substrmnb;
346 
347 
348  //------------------------------------------------------------------------
349  // The internal state of the the reader
350  //------------------------------------------------------------------------
351  std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
352  uint32_t inmsgsize;
354 
355  };
356 
357 } /* namespace XrdCl */
358 
359 #endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
Definition: XrdClAsyncMsgReader.hh:325
Definition: XrdClAsyncMsgReader.hh:326
bool IsStatusRsp()
Definition: XrdClAsyncMsgReader.hh:307
Definition: XProtocol.hh:936
std::shared_ptr< Message > inmsg
Definition: XrdClAsyncMsgReader.hh:351
XRootDStatus Read()
Read out the response from the socket.
Definition: XrdClAsyncMsgReader.hh:82
TransportHandler & xrdTransport
Definition: XrdClAsyncMsgReader.hh:341
Socket & socket
Definition: XrdClAsyncMsgReader.hh:342
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:41
bool HasEmbeddedRsp()
Definition: XrdClAsyncMsgReader.hh:313
Message handler.
Definition: XrdClPostMasterInterfaces.hh:50
virtual ~AsyncMsgReader()
Destructor.
Definition: XrdClAsyncMsgReader.hh:66
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
Definition: XrdClPostMasterInterfaces.hh:138
kXR_unt16 status
Definition: XProtocol.hh:913
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
Stage readstage
Definition: XrdClAsyncMsgReader.hh:336
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
Definition: XrdClAsyncMsgReader.hh:48
const std::string & strmname
Definition: XrdClAsyncMsgReader.hh:343
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClPostMasterInterfaces.hh:69
Definition: XrdClAsyncMsgReader.hh:329
XRootDStatus ReadAttnActnum()
Definition: XrdClAsyncMsgReader.hh:281
MsgHandler * inhandler
Definition: XrdClAsyncMsgReader.hh:353
static Log * GetLog()
Get default log.
Definition: XProtocol.hh:899
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:309
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
Definition: XProtocol.hh:1281
Definition: XProtocol.hh:911
Definition: XrdClAsyncMsgReader.hh:328
uint16_t substrmnb
Definition: XrdClAsyncMsgReader.hh:345
Utility class encapsulating reading response message logic.
Definition: XrdClAsyncMsgReader.hh:36
Definition: XrdClAsyncMsgReader.hh:330
Definition: XrdClAsyncMsgReader.hh:327
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition: XrdClAsyncMsgReader.hh:71
ServerResponseHeader hdr
Definition: XProtocol.hh:1283
const uint16_t suRetry
Definition: XrdClStatus.hh:40
Stream.
Definition: XrdClStream.hh:51
virtual XRootDStatus GetMore(Message &message, Socket *socket)=0
Stage
Stages of reading out a response from the socket.
Definition: XrdClAsyncMsgReader.hh:322
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:124
Definition: XrdClAsyncMsgReader.hh:324
Stream & strm
Definition: XrdClAsyncMsgReader.hh:344
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
there are more (non-raw) data to be read
Definition: XrdClPostMasterInterfaces.hh:72
Definition: XProtocol.hh:905
kXR_int32 actnum
Definition: XProtocol.hh:940
A network socket.
Definition: XrdClSocket.hh:42
uint32_t inmsgsize
Definition: XrdClAsyncMsgReader.hh:352
Handle diagnostics.
Definition: XrdClLog.hh:100
Definition: XProtocol.hh:939
Definition: XrdClPostMasterInterfaces.hh:63
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)