68 friend class PgReadRetryHandler;
75 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77 uint64_t orgOffset ) :
78 stateHandler( stateHandler ),
79 userHandler( userHandler ),
80 orgOffset( orgOffset ),
94 using namespace XrdCl;
96 std::unique_lock<std::mutex> lck( mtx );
104 if( !status->
IsOK() )
119 PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
124 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
135 if( !status->
IsOK() )
140 userHandler->HandleResponseWithHosts( status, response, hostList );
152 response->
Get( pginf );
156 std::vector<uint32_t> &cksums = pginf->
GetCksums();
157 char *buffer =
reinterpret_cast<char*
>( pginf->
GetBuffer() );
160 if( pgsize > bytesRead ) pgsize = bytesRead;
162 for(
size_t pgnb = 0; pgnb < nbpages; ++pgnb )
165 if( crcval != cksums[pgnb] )
168 log->
Info( FileMsg,
"[0x%x@%s] Received corrupted page, will retry page #%d.",
169 this, stateHandler->pFileUrl->GetURL().c_str(), pgnb );
184 if( pgsize > bytesRead ) pgsize = bytesRead;
193 userHandler->HandleResponseWithHosts( status, response, hostList );
202 resp.reset( response );
203 hosts.reset( hostList );
207 void UpdateCksum(
size_t pgnb, uint32_t crcval )
219 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
223 std::unique_ptr<XrdCl::AnyObject> resp;
224 std::unique_ptr<XrdCl::HostList> hosts;
225 std::unique_ptr<XrdCl::XRootDStatus> st;
241 PgReadRetryHandler( PgReadHandler *pgReadHandler,
size_t pgnb ) : pgReadHandler( pgReadHandler ),
254 using namespace XrdCl;
256 if( !status->
IsOK() )
259 log->
Info( FileMsg,
"[0x%x@%s] Failed to recover page #%d.",
260 this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
261 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
267 response->
Get( pginf );
271 log->
Info( FileMsg,
"[0x%x@%s] Failed to recover page #%d.",
272 this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
274 DeleteArgs( status, response, hostList );
275 pgReadHandler->HandleResponseWithHosts(
new XRootDStatus( stError, errDataError ), 0, 0 );
281 if( crcval != pginf->
GetCksums().front() )
284 log->
Info( FileMsg,
"[0x%x@%s] Failed to recover page #%d.",
285 this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
286 DeleteArgs( status, response, hostList );
287 pgReadHandler->HandleResponseWithHosts(
new XRootDStatus( stError, errDataError ), 0, 0 );
293 log->
Info( FileMsg,
"[0x%x@%s] Successfully recovered page #%d.",
294 this, pgReadHandler->stateHandler->pFileUrl->GetURL().c_str(), pgnb );
296 DeleteArgs( 0, response, hostList );
297 pgReadHandler->UpdateCksum( pgnb, crcval );
298 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
313 PgReadHandler *pgReadHandler;
329 stateHandler( stateHandler ),
330 userHandler( userHandler )
341 if( !status->
IsOK() )
348 using namespace XrdCl;
351 rdresp->
Get( chunk );
353 std::vector<uint32_t> cksums;
354 if( stateHandler->pIsChannelEncrypted )
359 cksums.reserve( nbpages );
361 size_t size = chunk->
length;
362 char *buffer =
reinterpret_cast<char*
>( chunk->
buffer );
364 for(
size_t pg = 0; pg < nbpages; ++pg )
367 if( pgsize > size ) pgsize = size;
369 cksums.push_back( crcval );
376 chunk->
buffer, std::move( cksums ) );
379 response->
Set( pages );
380 userHandler->HandleResponseWithHosts( status, response, hostList );
387 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
401 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
403 pStateHandler( stateHandler ),
404 pUserHandler( userHandler )
415 using namespace XrdCl;
422 response->
Get( openInfo );
428 if( status->
code == errRedirect )
434 pStateHandler->pPlugin = ecHandler;
435 ecHandler->
Open( pStateHandler->pOpenFlags, pUserHandler, 0 );
443 pStateHandler->OnOpen( status, openInfo, hostList );
446 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
456 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
470 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
473 pStateHandler( stateHandler ),
474 pUserHandler( userHandler ),
482 virtual ~CloseHandler()
494 pStateHandler->OnClose( status );
496 pUserHandler->HandleResponseWithHosts( status, response, hostList );
508 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
522 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
526 pStateHandler( stateHandler ),
527 pUserHandler( userHandler ),
529 pSendParams( sendParams )
536 virtual ~StatefulHandler()
539 delete pSendParams.chunkList;
540 delete pSendParams.kbuff;
550 using namespace XrdCl;
551 std::unique_ptr<AnyObject> responsePtr( response );
552 pSendParams.hostList = hostList;
557 if( !status->
IsOK() )
566 responsePtr.release();
569 pUserHandler->HandleResponseWithHosts( status, response, hostList );
588 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
605 buffer( std::move( buffer ) ),
641 pFileState( Closed ),
647 pWrtRecoveryRedir( 0 ),
652 pDoRecoverRead( true ),
653 pDoRecoverWrite( true ),
654 pFollowRedirects( true ),
655 pUseVirtRedirector( true ),
656 pIsChannelEncrypted( false ),
657 pAllowBundledClose( false ),
660 pFileHandle =
new uint8_t[4];
661 ResetMonitoringVars();
674 pFileState( Closed ),
680 pWrtRecoveryRedir( 0 ),
685 pDoRecoverRead( true ),
686 pDoRecoverWrite( true ),
687 pFollowRedirects( true ),
688 pUseVirtRedirector( useVirtRedirector ),
689 pAllowBundledClose( false ),
692 pFileHandle =
new uint8_t[4];
693 ResetMonitoringVars();
724 ResetMonitoringVars();
738 delete pLoadBalancer;
739 delete [] pFileHandle;
740 delete pLFileHandler;
747 const std::string &url,
758 if( self->pFileState ==
Error )
759 return self->pStatus;
777 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
780 registry.
Release( *self->pFileUrl );
782 delete self->pFileUrl;
786 self->pFileUrl =
new URL( url );
794 char requuid[37]= {0};
795 uuid_generate( uuid );
796 uuid_unparse( uuid, requuid );
797 cgi[
"xrdcl.requuid"] = requuid;
798 self->pFileUrl->SetParams( cgi );
800 if( !self->pFileUrl->IsValid() )
802 log->
Error(
FileMsg,
"[0x%x@%s] Trying to open invalid url: %s",
803 self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
805 self->pFileState =
Closed;
806 return self->pStatus;
813 URL::ParamsMap::const_iterator it;
814 it = urlParams.find(
"xrdcl.recover-reads" );
815 if( (it != urlParams.end() && it->second ==
"false") ||
816 !self->pDoRecoverRead )
818 self->pDoRecoverRead =
false;
819 log->
Debug(
FileMsg,
"[0x%x@%s] Read recovery procedures are disabled",
820 self.get(), self->pFileUrl->GetURL().c_str() );
823 it = urlParams.find(
"xrdcl.recover-writes" );
824 if( (it != urlParams.end() && it->second ==
"false") ||
825 !self->pDoRecoverWrite )
827 self->pDoRecoverWrite =
false;
828 log->
Debug(
FileMsg,
"[0x%x@%s] Write recovery procedures are disabled",
829 self.get(), self->pFileUrl->GetURL().c_str() );
835 log->
Debug(
FileMsg,
"[0x%x@%s] Sending an open command", self.get(),
836 self->pFileUrl->GetURL().c_str() );
838 self->pOpenMode = mode;
839 self->pOpenFlags = flags;
844 std::string path = self->pFileUrl->GetPathWithFilteredParams();
850 req->
dlen = path.length();
851 msg->
Append( path.c_str(), path.length(), 24 );
858 XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
864 self->pFileState =
Closed;
882 if( self->pFileState ==
Error )
883 return self->pStatus;
888 if( self->pFileState ==
Closed )
894 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
900 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a close command for handle 0x%x to "
901 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
902 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
912 memcpy( req->
fhandle, self->pFileHandle, 4 );
916 CloseHandler *closeHandler =
new CloseHandler( self, handler, msg );
923 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
933 self->pFileState =
Closed;
942 self->pFileState =
Error;
958 if( self->pFileState ==
Error )
return self->pStatus;
976 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a stat command for handle 0x%x to "
977 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
978 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
987 std::string path = self->pFileUrl->GetPath();
991 memcpy( req->
fhandle, self->pFileHandle, 4 );
1000 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1002 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1017 if( self->pFileState ==
Error )
return self->pStatus;
1023 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a read command for handle 0x%x to "
1024 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1025 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1034 memcpy( req->
fhandle, self->pFileHandle, 4 );
1037 list->push_back(
ChunkInfo( offset, size, buffer ) );
1046 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1048 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1061 int issupported =
true;
1074 issupported =
false;
1079 self.get(), self->pFileUrl->GetURL().c_str() );
1081 auto st =
Read( self, offset, size, buffer, substitHandler, timeout );
1082 if( !st.
IsOK() )
delete substitHandler;
1088 if( !st.
IsOK() )
delete pgHandler;
1097 PgReadHandler *handler,
1102 "PgRead retry size exceeded 4KB." );
1106 if( !st.
IsOK() )
delete retryHandler;
1120 if( self->pFileState ==
Error )
return self->pStatus;
1126 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a pgread command for handle 0x%x to "
1127 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1128 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1137 memcpy( req->
fhandle, self->pFileHandle, 4 );
1150 list->push_back(
ChunkInfo( offset, size, buffer ) );
1159 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1161 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1176 if( self->pFileState ==
Error )
return self->pStatus;
1182 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a write command for handle 0x%x to "
1183 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1184 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1193 memcpy( req->
fhandle, self->pFileHandle, 4 );
1196 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
1207 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1209 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1228 log->
Info(
FileMsg,
"[0x%x@%s] Buffer is not page aligned (4KB), cannot "
1229 "convert it to kernel space buffer.", self.get(), self->pFileUrl->GetURL().c_str(),
1230 *((uint32_t*)self->pFileHandle) );
1232 void *buff = buffer.GetBuffer();
1233 uint32_t size = buffer.GetSize();
1234 ReleaseBufferHandler *wrtHandler =
1235 new ReleaseBufferHandler( std::move( buffer ), handler );
1236 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1239 buffer = std::move( wrtHandler->GetBuffer() );
1248 uint32_t length = buffer.GetSize();
1249 char *ubuff = buffer.Release();
1259 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1277 ssize_t ret = fdoff ?
XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1285 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1295 std::vector<uint32_t> &cksums,
1312 if( cksums.empty() )
1314 const char *data =
static_cast<const char*
>( buffer );
1320 if( crc32cCnt != cksums.size() )
1343 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1345 if( pgoff == offset )
return 0;
1351 if( !status ) status = s;
1358 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1362 uint32_t fstpglen = fLen;
1364 time_t start = ::time(
nullptr );
1367 std::unique_ptr<AnyObject> scoped( r );
1372 pgwrt->SetStatus( s );
1381 pgwrt->SetStatus( s );
1386 uint16_t elapsed = ::time(
nullptr ) - start;
1387 if( elapsed >= timeout )
1392 else timeout -= elapsed;
1394 for(
size_t i = 0; i < inf->
Size(); ++i )
1396 auto tpl = inf->
At( i );
1397 uint64_t pgoff = std::get<0>( tpl );
1398 uint32_t pglen = std::get<1>( tpl );
1399 const void *pgbuf =
static_cast<const char*
>( buffer ) + ( pgoff - offset );
1400 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1403 std::unique_ptr<AnyObject> scoped( r );
1407 pgwrt->SetStatus( s );
1417 "page: pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1418 self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1420 "Failed to retransmit corrupted page" ) );
1424 "page: pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1425 self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1427 auto st =
PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1428 if( !st.IsOK() ) pgwrt->SetStatus(
new XRootDStatus( st ) );
1430 "pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1431 self->pFileUrl->GetURL().c_str(), pgoff, pglen, pgdigest );
1435 auto st =
PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1438 pgwrt->handler =
nullptr;
1455 std::vector<uint32_t> cksums{ digest };
1466 std::vector<uint32_t> &cksums,
1473 if( self->pFileState ==
Error )
return self->pStatus;
1479 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a pgwrite command for handle 0x%x to "
1480 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1481 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1492 req->
dlen = size + cksums.size() *
sizeof( uint32_t );
1494 memcpy( req->
fhandle, self->pFileHandle, 4 );
1497 list->push_back(
ChunkInfo( offset, size, (
char*)buffer ) );
1509 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1511 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1523 if( self->pFileState ==
Error )
return self->pStatus;
1529 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a sync command for handle 0x%x to "
1530 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1531 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1538 memcpy( req->
fhandle, self->pFileHandle, 4 );
1547 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1549 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1562 if( self->pFileState ==
Error )
return self->pStatus;
1568 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a truncate command for handle 0x%x to "
1569 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1570 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1577 memcpy( req->
fhandle, self->pFileHandle, 4 );
1587 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1589 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1606 if( self->pFileState ==
Error )
return self->pStatus;
1612 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a vector read command for handle "
1613 "0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
1614 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1627 char *cursor = (
char*)buffer;
1633 for(
size_t i = 0; i < chunks.size(); ++i )
1635 dataChunk[i].
rlen = chunks[i].length;
1636 dataChunk[i].
offset = chunks[i].offset;
1637 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1642 chunkBuffer = cursor;
1643 cursor += chunks[i].length;
1646 chunkBuffer = chunks[i].buffer;
1648 list->push_back(
ChunkInfo( chunks[i].offset,
1664 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1666 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1682 if( self->pFileState ==
Error )
return self->pStatus;
1688 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a vector write command for handle "
1689 "0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
1690 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1719 for(
size_t i = 0; i < chunks.size(); ++i )
1721 writeList[i].
wlen = chunks[i].length;
1722 writeList[i].
offset = chunks[i].offset;
1723 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1725 list->push_back(
ChunkInfo( chunks[i].offset,
1727 chunks[i].buffer ) );
1741 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1743 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1751 const struct iovec *
iov,
1758 if( self->pFileState ==
Error )
return self->pStatus;
1764 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a write command for handle 0x%x to "
1765 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1766 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1775 for(
int i = 0; i < iovcnt; ++i )
1777 if(
iov[i].iov_len == 0 )
continue;
1778 size +=
iov[i].iov_len;
1780 (
char*)
iov[i].iov_base ) );
1786 memcpy( req->
fhandle, self->pFileHandle, 4 );
1797 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1799 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1814 if( self->pFileState ==
Error )
return self->pStatus;
1820 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a read command for handle 0x%x to "
1821 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1822 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1829 size_t size = std::accumulate(
iov,
iov + iovcnt, 0, [](
size_t acc, iovec &rhs )
1831 return acc + rhs.iov_len;
1837 memcpy( req->
fhandle, self->pFileHandle, 4 );
1840 list->reserve( iovcnt );
1841 uint64_t choff = offset;
1842 for(
int i = 0; i < iovcnt; ++i )
1844 list->emplace_back( choff,
iov[i].iov_len,
iov[i].iov_base );
1845 choff +=
iov[i].iov_len;
1855 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1857 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1871 if( self->pFileState ==
Error )
return self->pStatus;
1877 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a fcntl command for handle 0x%x to "
1878 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1879 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1888 memcpy( req->
fhandle, self->pFileHandle, 4 );
1898 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1900 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1912 if( self->pFileState ==
Error )
return self->pStatus;
1918 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a visa command for handle 0x%x to "
1919 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1920 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1928 memcpy( req->
fhandle, self->pFileHandle, 4 );
1937 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1939 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1946 const std::vector<xattr_t> &attrs,
1952 if( self->pFileState ==
Error )
return self->pStatus;
1958 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a fattr set command for handle 0x%x to "
1959 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1960 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1965 return XAttrOperationImpl( self,
kXR_fattrSet, 0, attrs, handler, timeout );
1972 const std::vector<std::string> &attrs,
1978 if( self->pFileState ==
Error )
return self->pStatus;
1984 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a fattr get command for handle 0x%x to "
1985 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
1986 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1991 return XAttrOperationImpl( self,
kXR_fattrGet, 0, attrs, handler, timeout );
1998 const std::vector<std::string> &attrs,
2004 if( self->pFileState ==
Error )
return self->pStatus;
2010 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a fattr del command for handle 0x%x to "
2011 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2012 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2017 return XAttrOperationImpl( self,
kXR_fattrDel, 0, attrs, handler, timeout );
2029 if( self->pFileState ==
Error )
return self->pStatus;
2035 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a fattr list command for handle 0x%x to "
2036 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2037 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2042 static const std::vector<std::string> nothing;
2044 nothing, handler, timeout );
2065 if( self->pFileState ==
Error )
return self->pStatus;
2071 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a checkpoint command for "
2072 "handle 0x%x to %s", self.get(), self->pFileUrl->GetURL().c_str(),
2073 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2081 memcpy( req->
fhandle, self->pFileHandle, 4 );
2091 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2093 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2116 if( self->pFileState ==
Error )
return self->pStatus;
2122 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a write command for handle 0x%x to "
2123 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2124 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2133 memcpy( req->
fhandle, self->pFileHandle, 4 );
2138 wrtreq->
dlen = size;
2139 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2142 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
2153 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2155 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2171 const struct iovec *
iov,
2178 if( self->pFileState ==
Error )
return self->pStatus;
2184 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a write command for handle 0x%x to "
2185 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
2186 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2195 memcpy( req->
fhandle, self->pFileHandle, 4 );
2199 for(
int i = 0; i < iovcnt; ++i )
2201 if(
iov[i].iov_len == 0 )
continue;
2202 size +=
iov[i].iov_len;
2204 (
char*)
iov[i].iov_base ) );
2210 wrtreq->
dlen = size;
2211 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2222 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2224 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2243 const std::string &value )
2246 if( name ==
"ReadRecovery" )
2248 if( value ==
"true" ) pDoRecoverRead =
true;
2249 else pDoRecoverRead =
false;
2252 else if( name ==
"WriteRecovery" )
2254 if( value ==
"true" ) pDoRecoverWrite =
true;
2255 else pDoRecoverWrite =
false;
2258 else if( name ==
"FollowRedirects" )
2260 if( value ==
"true" ) pFollowRedirects =
true;
2261 else pFollowRedirects =
false;
2264 else if( name ==
"BundledClose" )
2266 if( value ==
"true" ) pAllowBundledClose =
true;
2267 else pAllowBundledClose =
false;
2277 std::string &value )
const
2280 if( name ==
"ReadRecovery" )
2282 if( pDoRecoverRead ) value =
"true";
2283 else value =
"false";
2286 else if( name ==
"WriteRecovery" )
2288 if( pDoRecoverWrite ) value =
"true";
2289 else value =
"false";
2292 else if( name ==
"FollowRedirects" )
2294 if( pFollowRedirects ) value =
"true";
2295 else value =
"false";
2298 else if( name ==
"DataServer" && pDataServer )
2299 { value = pDataServer->
GetHostId();
return true; }
2300 else if( name ==
"LastURL" && pDataServer )
2301 { value = pDataServer->
GetURL();
return true; }
2302 else if( name ==
"WrtRecoveryRedir" && pWrtRecoveryRedir )
2303 { value = pWrtRecoveryRedir->
GetHostId();
return true; }
2321 std::string lastServer = pFileUrl->
GetHostId();
2325 delete pLoadBalancer;
2327 delete pWrtRecoveryRedir;
2328 pWrtRecoveryRedir = 0;
2330 pDataServer =
new URL( hostList->back().url );
2334 HostList::const_iterator itC;
2336 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2339 itC->url.GetParams(),
2344 HostList::const_reverse_iterator it;
2345 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2346 if( it->loadBalancer )
2348 pLoadBalancer =
new URL( it->url );
2352 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2355 pWrtRecoveryRedir =
new URL( it->url );
2360 log->
Debug(
FileMsg,
"[0x%x@%s] Open has returned with status %s",
2361 this, pFileUrl->
GetURL().c_str(), status->
ToStr().c_str() );
2374 isencobj.
Get( isenc );
2375 pIsChannelEncrypted = *isenc;
2384 if( !pStatus.
IsOK() || !openInfo )
2386 log->
Debug(
FileMsg,
"[0x%x@%s] Error while opening at %s: %s",
2387 this, pFileUrl->
GetURL().c_str(), lastServer.c_str(),
2388 pStatus.
ToStr().c_str() );
2389 FailQueuedMessages( pStatus );
2421 log->
Debug(
FileMsg,
"[0x%x@%s] successfully opened at %s, handle: 0x%x, "
2422 "session id: %ld",
this, pFileUrl->
GetURL().c_str(),
2423 pDataServer->
GetHostId().c_str(), *((uint32_t*)pFileHandle),
2429 gettimeofday( &pOpenTime, 0 );
2444 ReSendQueuedMessages();
2457 log->
Debug(
FileMsg,
"[0x%x@%s] Close returned from %s with: %s",
this,
2459 status->
ToStr().c_str() );
2461 log->
Dump(
FileMsg,
"[0x%x@%s] Items in the fly %d, queued for recovery %d",
2462 this, pFileUrl->
GetURL().c_str(), pInTheFly.size(),
2463 pToBeRecovered.size() );
2465 MonitorClose( status );
2466 ResetMonitoringVars();
2486 static const std::string root =
"root", xroot =
"xroot", file =
"file",
2487 roots =
"roots", xroots =
"xroots";
2489 if( !msg.compare( 0, root.size(), root ) ||
2490 !msg.compare( 0, xroot.size(), xroot ) ||
2491 !msg.compare( 0, file.size(), file ) ||
2492 !msg.compare( 0, roots.size(), roots ) ||
2493 !msg.compare( 0, xroots.size(), xroots ) )
2505 self->pInTheFly.erase( message );
2507 log->
Dump(
FileMsg,
"[0x%x@%s] File state error encountered. Message %s "
2508 "returned with %s", self.get(), self->pFileUrl->GetURL().c_str(),
2518 i.
file = self->pFileUrl;
2540 if( !self->IsRecoverable( *status ) || sendParams.
kbuff )
2542 log->
Error(
FileMsg,
"[0x%x@%s] Fatal file state error. Message %s "
2543 "returned with %s", self.get(), self->pFileUrl->GetURL().c_str(),
2546 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2555 self->pCloseReason = *status;
2556 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2564 const std::string &redirectUrl,
2570 self->pInTheFly.erase( message );
2576 if( !self->pStateRedirect )
2578 std::ostringstream o;
2579 self->pStateRedirect =
new URL( redirectUrl );
2582 self->pStateRedirect->GetParams(),
2584 self->pFileUrl->SetParams( params );
2587 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2602 log->
Dump(
FileMsg,
"[0x%x@%s] Got state response for message %s",
2603 self.get(), self->pFileUrl->GetURL().c_str(),
2610 self->pInTheFly.erase( message );
2611 RunRecovery( self );
2626 response->
Get( info );
2627 delete self->pStatInfo;
2628 self->pStatInfo =
new StatInfo( *info );
2660 for(
size_t i = 0; i < segs; ++i )
2661 self->pVRBytes += dataChunk[i].
rlen;
2662 self->pVSegs += segs;
2695 for(
size_t i = 0; i < size; ++i )
2696 self->pVWBytes += wrtList[i].
wlen;
2718 if( !pToBeRecovered.empty() )
2721 log->
Dump(
FileMsg,
"[0x%x@%s] Got a timer event",
this,
2722 pFileUrl->
GetURL().c_str() );
2723 RequestList::iterator it;
2725 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2727 if( it->params.expires <= now )
2732 0, it->params.hostList ) );
2733 it = pToBeRecovered.erase( it );
2751 if( (IsReadOnly() && pDoRecoverRead) ||
2752 (!IsReadOnly() && pDoRecoverWrite) )
2754 log->
Debug(
FileMsg,
"[0x%x@%s] Putting the file in recovery state in "
2755 "process %d",
this, pFileUrl->
GetURL().c_str(), getpid() );
2758 pToBeRecovered.clear();
2771 if( self->pFileState !=
Opened || !self->pLoadBalancer )
2777 log->
Debug(
FileMsg,
"[0x%x@%s] Reopen file at next data server.",
2778 self.get(), self->pFileUrl->GetURL().c_str() );
2781 auto lbcgi = self->pLoadBalancer->GetParams();
2782 auto dtcgi = self->pDataServer->GetParams();
2785 auto itr = lbcgi.find(
"tried" );
2786 if( itr == lbcgi.end() )
2787 lbcgi[
"tried"] = self->pDataServer->GetHostName();
2790 std::string tried = itr->second;
2791 tried +=
"," + self->pDataServer->GetHostName();
2792 lbcgi[
"tried"] = tried;
2794 self->pLoadBalancer->SetParams( lbcgi );
2796 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2802 template<
typename T>
2803 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2806 const std::vector<T> &attrs,
2821 memcpy( req->
fhandle, self->pFileHandle, 4 );
2823 if( !st.
IsOK() )
return st;
2832 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2834 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2840 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2851 return RecoverMessage( self, RequestData( msg, handler, sendParams ),
false );
2857 if( self->pFileState ==
Opened )
2860 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2867 return RecoverMessage( self, RequestData( msg, handler, sendParams ),
false );
2870 self->pInTheFly.insert(msg);
2881 bool FileStateHandler::IsRecoverable(
const XRootDStatus &status )
const
2883 const auto recoverable_errors = {
2892 if (pDoRecoverRead || pDoRecoverWrite)
2893 for (
const auto error : recoverable_errors)
2894 if (status.code == error)
2895 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2903 bool FileStateHandler::IsReadOnly()
const
2914 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2916 bool callbackOnFailure )
2921 log->
Dump(
FileMsg,
"[0x%x@%s] Putting message %s in the recovery list",
2922 self.get(), self->pFileUrl->GetURL().c_str(),
2923 rd.request->GetDescription().c_str() );
2925 Status st = RunRecovery( self );
2928 self->pToBeRecovered.push_back( rd );
2932 if( callbackOnFailure )
2933 self->FailMessage( rd, st );
2941 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2946 if( !self->pInTheFly.empty() )
2950 log->
Debug(
FileMsg,
"[0x%x@%s] Running the recovery procedure", self.get(),
2951 self->pFileUrl->GetURL().c_str() );
2954 if( self->pStateRedirect )
2956 SendClose( self, 0 );
2957 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2958 delete self->pStateRedirect; self->pStateRedirect = 0;
2960 else if( self->IsReadOnly() && self->pLoadBalancer )
2961 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2963 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2967 self->pFileState =
Error;
2969 self->FailQueuedMessages( st );
2978 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2986 memcpy( req->
fhandle, self->pFileHandle, 4 );
2994 params.followRedirects =
false;
2995 params.stateful =
true;
2999 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3005 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3010 log->
Dump(
FileMsg,
"[0x%x@%s] Sending a recovery open command to %s",
3011 self.get(), self->pFileUrl->GetURL().c_str(), url.
GetURL().c_str() );
3020 self->pOpenFlags &= ~kXR_delete;
3024 self->pOpenFlags &= ~kXR_new;
3031 u.
SetPath( self->pFileUrl->GetPath() );
3037 req->
mode = self->pOpenMode;
3038 req->
options = self->pOpenFlags;
3039 req->
dlen = path.length();
3040 msg->
Append( path.c_str(), path.length(), 24 );
3053 XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3060 self->pFileState =
Closed;
3068 void FileStateHandler::FailMessage( RequestData rd,
XRootDStatus status )
3071 log->
Dump(
FileMsg,
"[0x%x@%s] Failing message %s with %s",
3072 this, pFileUrl->
GetURL().c_str(),
3073 rd.request->GetDescription().c_str(),
3074 status.
ToStr().c_str() );
3076 StatefulHandler *sh =
dynamic_cast<StatefulHandler*
>(rd.handler);
3080 log->
Error(
FileMsg,
"[0x%x@%s] Internal error while recovering %s",
3081 this, pFileUrl->
GetURL().c_str(),
3082 rd.request->GetDescription().c_str() );
3091 0, rd.params.hostList ) );
3099 void FileStateHandler::FailQueuedMessages(
XRootDStatus status )
3101 RequestList::iterator it;
3102 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3103 FailMessage( *it, status );
3104 pToBeRecovered.clear();
3110 void FileStateHandler::ReSendQueuedMessages()
3112 RequestList::iterator it;
3113 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3115 it->request->SetSessionId( pSessionId );
3116 ReWriteFileHandle( it->request );
3117 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3118 it->handler, it->params );
3120 FailMessage( *it, st );
3122 pToBeRecovered.clear();
3128 void FileStateHandler::ReWriteFileHandle(
Message *msg )
3136 memcpy( req->
fhandle, pFileHandle, 4 );
3142 memcpy( req->
fhandle, pFileHandle, 4 );
3148 memcpy( req->
fhandle, pFileHandle, 4 );
3154 memcpy( req->
fhandle, pFileHandle, 4 );
3162 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3172 for(
size_t i = 0; i < size; ++i )
3173 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3179 memcpy( req->
fhandle, pFileHandle, 4 );
3185 memcpy( req->
fhandle, pFileHandle, 4 );
3191 log->
Dump(
FileMsg,
"[0x%x@%s] Rewritten file handle for %s to 0x%x",
3193 *((uint32_t*)pFileHandle) );
3200 void FileStateHandler::MonitorClose(
const XRootDStatus *status )
3208 gettimeofday( &i.
cTOD, 0 );
3230 sendParams, pLFileHandler );
3234 return pLFileHandler->
ExecRequest( url, msg, handler, sendParams );
3238 sendParams, pLFileHandler );
3244 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3247 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3260 log->
Debug(
FileMsg,
"[0x%x@%s] Sending a write command for handle 0x%x to "
3261 "%s", self.get(), self->pFileUrl->GetURL().c_str(),
3262 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3271 memcpy( req->
fhandle, self->pFileHandle, 4 );
3277 params.
kbuff = kbuff.release();
3283 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
3285 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
struct ClientPgReadRequest pgread
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientRequestHdr header
struct ClientReadRequest read
#define kXR_PROTPGRWVERSION
struct ClientWriteRequest write
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
friend class ::PgReadRetryHandler
void Tick(time_t now)
Tick.
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
friend class ::PgReadHandler
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
~FileStateHandler()
Destructor.
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
friend class ::PgReadSubstitutionHandler
bool IsOpen() const
Check if the file is open.
friend class ::OpenHandler
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetDescription() const
Get the description of the message.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
uint64_t GetSize() const
Get size (in bytes)
const std::string & GetPath() const
Get the path.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
bool IsMetalink() const
Is it a URL to a metalink.
std::map< std::string, std::string > ParamsMap
void SetParams(const std::string ¶ms)
Set params.
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
std::string GetURL() const
Get the URL.
void SetPath(const std::string &path)
Set the path.
const ParamsMap & GetParams() const
Get the URL params.
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
const uint16_t errRedirect
const uint16_t errSocketDisconnected
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted