69 friend class PgReadRetryHandler;
76 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77 XrdCl::ResponseHandler *userHandler,
78 uint64_t orgOffset ) :
79 stateHandler( stateHandler ),
80 userHandler( userHandler ),
81 orgOffset( orgOffset ),
91 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
92 XrdCl::AnyObject *response,
95 using namespace XrdCl;
97 std::unique_lock<std::mutex> lck( mtx );
105 if( !status->
IsOK() )
122 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
125 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
136 if( !status->
IsOK() )
141 userHandler->HandleResponseWithHosts( status, response, hostList );
153 response->
Get( pginf );
157 std::vector<uint32_t> &cksums = pginf->
GetCksums();
158 char *buffer =
reinterpret_cast<char*
>( pginf->
GetBuffer() );
161 if( pgsize > bytesRead ) pgsize = bytesRead;
163 for(
size_t pgnb = 0; pgnb < nbpages; ++pgnb )
166 if( crcval != cksums[pgnb] )
168 Log *log = DefaultEnv::GetLog();
169 log->
Info( FileMsg,
"[%p@%s] Received corrupted page, will retry page #%zu.",
170 (
void*)
this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
185 if( pgsize > bytesRead ) pgsize = bytesRead;
194 userHandler->HandleResponseWithHosts( status, response, hostList );
203 resp.reset( response );
204 hosts.reset( hostList );
208 void UpdateCksum(
size_t pgnb, uint32_t crcval )
212 XrdCl::PageInfo *pginf = 0;
220 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221 XrdCl::ResponseHandler *userHandler;
224 std::unique_ptr<XrdCl::AnyObject> resp;
225 std::unique_ptr<XrdCl::HostList> hosts;
226 std::unique_ptr<XrdCl::XRootDStatus> st;
242 PgReadRetryHandler( PgReadHandler *pgReadHandler,
size_t pgnb ) : pgReadHandler( pgReadHandler ),
251 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252 XrdCl::AnyObject *response,
255 using namespace XrdCl;
257 if( !status->
IsOK() )
259 Log *log = DefaultEnv::GetLog();
260 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
261 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
267 XrdCl::PageInfo *pginf = 0;
268 response->
Get( pginf );
271 Log *log = DefaultEnv::GetLog();
272 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
273 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
275 DeleteArgs( status, response, hostList );
276 pgReadHandler->HandleResponseWithHosts(
new XRootDStatus( stError, errDataError ), 0, 0 );
282 if( crcval != pginf->
GetCksums().front() )
284 Log *log = DefaultEnv::GetLog();
285 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
286 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287 DeleteArgs( status, response, hostList );
288 pgReadHandler->HandleResponseWithHosts(
new XRootDStatus( stError, errDataError ), 0, 0 );
293 Log *log = DefaultEnv::GetLog();
294 log->
Info( FileMsg,
"[%p@%s] Successfully recovered page #%zu.",
295 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
297 DeleteArgs( 0, response, hostList );
298 pgReadHandler->UpdateCksum( pgnb, crcval );
299 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
305 inline void DeleteArgs( XrdCl::XRootDStatus *status,
306 XrdCl::AnyObject *response,
314 PgReadHandler *pgReadHandler;
328 PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329 XrdCl::ResponseHandler *userHandler ) :
330 stateHandler( stateHandler ),
331 userHandler( userHandler )
338 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339 XrdCl::AnyObject *rdresp,
342 using namespace XrdCl;
344 if( !status->
IsOK() )
346 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
352 ChunkInfo *chunk =
nullptr;
353 rdresp->
Get( chunk );
357 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
362 std::vector<uint32_t> cksums;
363 if( stateHandler->pIsChannelEncrypted )
368 cksums.reserve( nbpages );
370 size_t size = chunk->
length;
371 char *buffer =
reinterpret_cast<char*
>( chunk->
buffer );
373 for(
size_t pg = 0; pg < nbpages; ++pg )
376 if( pgsize > size ) pgsize = size;
378 cksums.push_back( crcval );
384 PageInfo *pages =
new PageInfo( chunk->
offset, chunk->
length,
385 chunk->
buffer, std::move( cksums ) );
387 AnyObject *response =
new AnyObject();
388 response->
Set( pages );
389 userHandler->HandleResponseWithHosts( status, response, hostList );
396 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
397 XrdCl::ResponseHandler *userHandler;
410 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
411 XrdCl::ResponseHandler *userHandler ):
412 pStateHandler( stateHandler ),
413 pUserHandler( userHandler )
420 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
421 XrdCl::AnyObject *response,
424 using namespace XrdCl;
429 OpenInfo *openInfo = 0;
431 response->
Get( openInfo );
437 if( status->
code == errRedirect )
440 EcHandler *ecHandler =
GetEcHandler( hostList->front().url, ecurl );
441 if( ecHandler && pStateHandler->NeedFileTempl() )
444 status =
new XRootDStatus( stError, errNotSupported, 0,
445 "File template not supported with Ec" );
451 pStateHandler->pPlugin = ecHandler;
452 ecHandler->
Open( pStateHandler->pOpenFlags, pUserHandler, 0 );
460 pStateHandler->OnOpen( status, openInfo, hostList );
463 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
473 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
474 XrdCl::ResponseHandler *pUserHandler;
487 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
488 XrdCl::ResponseHandler *userHandler,
489 XrdCl::Message *message ):
490 pStateHandler( stateHandler ),
491 pUserHandler( userHandler ),
499 virtual ~CloseHandler()
507 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
508 XrdCl::AnyObject *response,
511 pStateHandler->OnClose( status );
513 pUserHandler->HandleResponseWithHosts( status, response, hostList );
525 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
526 XrdCl::ResponseHandler *pUserHandler;
527 XrdCl::Message *pMessage;
539 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
540 XrdCl::ResponseHandler *userHandler,
541 XrdCl::Message *message,
542 const XrdCl::MessageSendParams &sendParams ):
543 pStateHandler( stateHandler ),
544 pUserHandler( userHandler ),
546 pSendParams( sendParams )
553 virtual ~StatefulHandler()
556 delete pSendParams.chunkList;
557 delete pSendParams.kbuff;
563 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
564 XrdCl::AnyObject *response,
567 using namespace XrdCl;
568 std::unique_ptr<AnyObject> responsePtr( response );
569 pSendParams.hostList = hostList;
574 if( !status->
IsOK() )
583 responsePtr.release();
586 pUserHandler->HandleResponseWithHosts( status, response, hostList );
599 XrdCl::ResponseHandler *GetUserHandler()
605 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
606 XrdCl::ResponseHandler *pUserHandler;
607 XrdCl::Message *pMessage;
608 XrdCl::MessageSendParams pSendParams;
621 ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
622 buffer( std::move( buffer ) ),
630 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
631 XrdCl::AnyObject *response,
635 handler->HandleResponseWithHosts( status, response, hostList );
641 XrdCl::Buffer& GetBuffer()
647 XrdCl::Buffer buffer;
648 XrdCl::ResponseHandler *handler;
664 pWrtRecoveryRedir( 0 ),
669 pDoRecoverRead( true ),
670 pDoRecoverWrite( true ),
671 pFollowRedirects( true ),
672 pUseVirtRedirector( true ),
673 pIsChannelEncrypted( false ),
674 pAllowBundledClose( false ),
677 pFileHandle =
new uint8_t[4];
678 ResetMonitoringVars();
697 pWrtRecoveryRedir( 0 ),
702 pDoRecoverRead( true ),
703 pDoRecoverWrite( true ),
704 pFollowRedirects( true ),
705 pUseVirtRedirector( useVirtRedirector ),
706 pAllowBundledClose( false ),
709 pFileHandle =
new uint8_t[4];
710 ResetMonitoringVars();
741 ResetMonitoringVars();
746 if(
DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
755 delete pLoadBalancer;
756 delete [] pFileHandle;
757 delete pLFileHandler;
764 std::shared_ptr<FileStateHandler> &self,
766 const std::string &url,
781 return OpenImpl( self, url, flags, mode, handler, timeout );
788 const std::string &url,
794 self->pTemplateFileWp.reset();
795 return OpenImpl( self, url, flags, mode, handler, timeout );
801 XRootDStatus FileStateHandler::OpenImpl( std::shared_ptr<FileStateHandler> &self,
802 const std::string &url,
813 if( self->pFileState ==
Error )
814 return self->pStatus;
832 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
835 registry.
Release( *self->pFileUrl );
837 delete self->pFileUrl;
841 self->pFileUrl =
new URL( url );
849 char requuid[37]= {0};
850 uuid_generate( uuid );
851 uuid_unparse( uuid, requuid );
852 cgi[
"xrdcl.requuid"] = requuid;
853 self->pFileUrl->SetParams( cgi );
855 if( !self->pFileUrl->IsValid() )
857 log->
Error(
FileMsg,
"[%p@%s] Trying to open invalid url: %s",
858 (
void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
860 self->pFileState =
Closed;
861 return self->pStatus;
868 URL::ParamsMap::const_iterator it;
869 it = urlParams.find(
"xrdcl.recover-reads" );
870 if( (it != urlParams.end() && it->second ==
"false") ||
871 !self->pDoRecoverRead )
873 self->pDoRecoverRead =
false;
874 log->
Debug(
FileMsg,
"[%p@%s] Read recovery procedures are disabled",
875 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
878 it = urlParams.find(
"xrdcl.recover-writes" );
879 if( (it != urlParams.end() && it->second ==
"false") ||
880 !self->pDoRecoverWrite )
882 self->pDoRecoverWrite =
false;
883 log->
Debug(
FileMsg,
"[%p@%s] Write recovery procedures are disabled",
884 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
890 log->
Debug(
FileMsg,
"[%p@%s] Sending an open command", (
void*)self.get(),
891 self->pFileUrl->GetObfuscatedURL().c_str() );
893 self->pOpenMode = mode;
894 self->pOpenFlags = flags;
898 ClientOpenRequest *req;
899 std::string path = self->pFileUrl->GetPathWithFilteredParams();
905 req->
dlen = path.length();
907 XRootDStatus st = FillFhTempl( self, *self->pFileUrl, msg, sendUrl );
912 self->pFileState =
Closed;
915 msg->
Append( path.c_str(), path.length(), 24 );
918 MessageSendParams params; params.
timeout = timeout;
922 st = self->IssueRequest( sendUrl, msg, openHandler, params );
928 self->pFileState =
Closed;
946 if( self->pFileState ==
Error )
947 return self->pStatus;
952 if( self->pFileState ==
Closed )
958 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
964 log->
Debug(
FileMsg,
"[%p@%s] Sending a close command for handle %#x to %s",
965 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
966 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
976 memcpy( req->
fhandle, self->pFileHandle, 4 );
980 CloseHandler *closeHandler =
new CloseHandler( self, handler, msg );
987 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
997 self->pFileState =
Closed;
1004 delete closeHandler;
1006 self->pFileState =
Error;
1022 if( self->pFileState ==
Error )
return self->pStatus;
1040 log->
Debug(
FileMsg,
"[%p@%s] Sending a stat command for handle %#x to %s",
1041 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1042 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1051 std::string path = self->pFileUrl->GetPath();
1055 memcpy( req->
fhandle, self->pFileHandle, 4 );
1064 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1066 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1082 if( self->pFileState ==
Error )
return self->pStatus;
1088 log->
Debug(
FileMsg,
"[%p@%s] Sending an read+preread command for handle %#x to %s",
1089 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1090 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1102 memcpy( req->
fhandle, self->pFileHandle, 4 );
1105 static char dummyBuff[8];
1107 list->push_back(
ChunkInfo( 0, 0, dummyBuff ) );
1113 for(
size_t i = 0; i < tracts.size(); ++i )
1115 dataTract[i].
rlen = tracts[i].length;
1116 dataTract[i].
offset = tracts[i].offset;
1117 memcpy( dataTract[i].fhandle, req->
fhandle, 4 );
1131 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1133 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1148 if( self->pFileState ==
Error )
return self->pStatus;
1154 log->
Debug(
FileMsg,
"[%p@%s] Sending a read command for handle %#x to %s",
1155 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1156 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1165 memcpy( req->
fhandle, self->pFileHandle, 4 );
1168 list->push_back(
ChunkInfo( offset, size, buffer ) );
1177 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1179 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1192 int issupported =
true;
1205 issupported =
false;
1210 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1212 auto st =
Read( self, offset, size, buffer, substitHandler, timeout );
1213 if( !st.
IsOK() )
delete substitHandler;
1219 if( !st.
IsOK() )
delete pgHandler;
1233 "PgRead retry size exceeded 4KB." );
1237 if( !st.
IsOK() )
delete retryHandler;
1251 if( self->pFileState ==
Error )
return self->pStatus;
1257 log->
Debug(
FileMsg,
"[%p@%s] Sending a pgread command for handle %#x to %s",
1258 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1259 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1268 memcpy( req->
fhandle, self->pFileHandle, 4 );
1281 list->push_back(
ChunkInfo( offset, size, buffer ) );
1290 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1292 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1307 if( self->pFileState ==
Error )
return self->pStatus;
1313 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
1314 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1315 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1324 memcpy( req->
fhandle, self->pFileHandle, 4 );
1327 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
1338 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1340 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1359 log->
Info(
FileMsg,
"[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1360 "cannot convert it to kernel space buffer.", (
void*)self.get(),
1361 self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1363 void *buff = buffer.GetBuffer();
1364 uint32_t size = buffer.GetSize();
1365 ReleaseBufferHandler *wrtHandler =
1366 new ReleaseBufferHandler( std::move( buffer ), handler );
1367 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1370 buffer = std::move( wrtHandler->GetBuffer() );
1379 uint32_t length = buffer.GetSize();
1380 char *ubuff = buffer.Release();
1390 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1408 ssize_t ret = fdoff ?
XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1416 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1426 std::vector<uint32_t> &cksums,
1443 if( cksums.empty() )
1445 const char *data =
static_cast<const char*
>( buffer );
1451 if( crc32cCnt != cksums.size() )
1474 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1476 if( pgoff == offset )
return 0;
1482 if( !status ) status = s;
1489 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1493 uint32_t fstpglen = fLen;
1495 time_t start = ::time(
nullptr );
1498 std::unique_ptr<AnyObject> scoped( r );
1503 pgwrt->SetStatus( s );
1512 pgwrt->SetStatus( s );
1517 time_t elapsed = ::time(
nullptr ) - start;
1518 if( elapsed >= timeout )
1523 else timeout -= elapsed;
1525 for(
size_t i = 0; i < inf->
Size(); ++i )
1527 auto tpl = inf->
At( i );
1528 uint64_t pgoff = std::get<0>( tpl );
1529 uint32_t pglen = std::get<1>( tpl );
1530 const void *pgbuf =
static_cast<const char*
>( buffer ) + ( pgoff - offset );
1531 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1534 std::unique_ptr<AnyObject> scoped( r );
1538 pgwrt->SetStatus( s );
1548 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1549 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1551 "Failed to retransmit corrupted page" ) );
1555 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1556 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1558 auto st =
PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1559 if( !st.IsOK() ) pgwrt->SetStatus(
new XRootDStatus( st ) );
1561 "pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1562 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1566 auto st =
PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1569 pgwrt->handler =
nullptr;
1586 std::vector<uint32_t> cksums{ digest };
1597 std::vector<uint32_t> &cksums,
1604 if( self->pFileState ==
Error )
return self->pStatus;
1610 log->
Debug(
FileMsg,
"[%p@%s] Sending a pgwrite command for handle %#x to %s",
1611 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1612 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1623 req->
dlen = size + cksums.size() *
sizeof( uint32_t );
1625 memcpy( req->
fhandle, self->pFileHandle, 4 );
1628 list->push_back(
ChunkInfo( offset, size, (
char*)buffer ) );
1640 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1642 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1654 if( self->pFileState ==
Error )
return self->pStatus;
1660 log->
Debug(
FileMsg,
"[%p@%s] Sending a sync command for handle %#x to %s",
1661 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1662 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1669 memcpy( req->
fhandle, self->pFileHandle, 4 );
1678 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1680 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1693 if( self->pFileState ==
Error )
return self->pStatus;
1699 log->
Debug(
FileMsg,
"[%p@%s] Sending a truncate command for handle %#x to %s",
1700 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1701 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1708 memcpy( req->
fhandle, self->pFileHandle, 4 );
1718 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1720 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1737 if( self->pFileState ==
Error )
return self->pStatus;
1743 log->
Debug(
FileMsg,
"[%p@%s] Sending a vector read command for handle %#x to %s",
1744 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1745 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1758 char *cursor = (
char*)buffer;
1764 for(
size_t i = 0; i < chunks.size(); ++i )
1766 dataChunk[i].
rlen = chunks[i].length;
1767 dataChunk[i].
offset = chunks[i].offset;
1768 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1773 chunkBuffer = cursor;
1774 cursor += chunks[i].length;
1777 chunkBuffer = chunks[i].buffer;
1779 list->push_back(
ChunkInfo( chunks[i].offset,
1795 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1797 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1813 if( self->pFileState ==
Error )
return self->pStatus;
1819 log->
Debug(
FileMsg,
"[%p@%s] Sending a vector write command for handle %#x to %s",
1820 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1821 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1850 for(
size_t i = 0; i < chunks.size(); ++i )
1852 writeList[i].
wlen = chunks[i].length;
1853 writeList[i].
offset = chunks[i].offset;
1854 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1856 list->push_back(
ChunkInfo( chunks[i].offset,
1858 chunks[i].buffer ) );
1872 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1874 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1882 const struct iovec *
iov,
1889 if( self->pFileState ==
Error )
return self->pStatus;
1895 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
1896 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1897 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1906 for(
int i = 0; i < iovcnt; ++i )
1908 if(
iov[i].iov_len == 0 )
continue;
1909 size +=
iov[i].iov_len;
1911 (
char*)
iov[i].iov_base ) );
1917 memcpy( req->
fhandle, self->pFileHandle, 4 );
1928 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1930 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1945 if( self->pFileState ==
Error )
return self->pStatus;
1951 log->
Debug(
FileMsg,
"[%p@%s] Sending a read command for handle %#x to %s",
1952 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1953 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1960 size_t size = std::accumulate(
iov,
iov + iovcnt, 0, [](
size_t acc, iovec &rhs )
1962 return acc + rhs.iov_len;
1968 memcpy( req->
fhandle, self->pFileHandle, 4 );
1971 list->reserve( iovcnt );
1972 uint64_t choff = offset;
1973 for(
int i = 0; i < iovcnt; ++i )
1975 list->emplace_back( choff,
iov[i].iov_len,
iov[i].iov_base );
1976 choff +=
iov[i].iov_len;
1986 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1988 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2004 if( self->pFileState ==
Error )
return self->pStatus;
2010 log->
Debug(
FileMsg,
"[%p@%s] Sending a fcntl command for handle %#x to %s",
2011 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2012 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2021 memcpy( req->
fhandle, self->pFileHandle, 4 );
2031 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2033 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2045 if( self->pFileState ==
Error )
return self->pStatus;
2051 log->
Debug(
FileMsg,
"[%p@%s] Sending a visa command for handle %#x to %s",
2052 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2053 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2061 memcpy( req->
fhandle, self->pFileHandle, 4 );
2070 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2072 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2079 const std::vector<xattr_t> &attrs,
2085 if( self->pFileState ==
Error )
return self->pStatus;
2091 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr set command for handle %#x to %s",
2092 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2093 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2098 return XAttrOperationImpl( self,
kXR_fattrSet, 0, attrs, handler, timeout );
2105 const std::vector<std::string> &attrs,
2111 if( self->pFileState ==
Error )
return self->pStatus;
2117 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr get command for handle %#x to %s",
2118 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2119 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2124 return XAttrOperationImpl( self,
kXR_fattrGet, 0, attrs, handler, timeout );
2131 const std::vector<std::string> &attrs,
2137 if( self->pFileState ==
Error )
return self->pStatus;
2143 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr del command for handle %#x to %s",
2144 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2145 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2150 return XAttrOperationImpl( self,
kXR_fattrDel, 0, attrs, handler, timeout );
2162 if( self->pFileState ==
Error )
return self->pStatus;
2168 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr list command for handle %#x to %s",
2169 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2170 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2175 static const std::vector<std::string> nothing;
2177 nothing, handler, timeout );
2198 if( self->pFileState ==
Error )
return self->pStatus;
2204 log->
Debug(
FileMsg,
"[%p@%s] Sending a checkpoint command for handle %#x to %s",
2205 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2206 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2214 memcpy( req->
fhandle, self->pFileHandle, 4 );
2224 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2226 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2249 if( self->pFileState ==
Error )
return self->pStatus;
2255 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
2256 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2257 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2266 memcpy( req->
fhandle, self->pFileHandle, 4 );
2271 wrtreq->
dlen = size;
2272 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2275 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
2286 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2288 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2304 const struct iovec *
iov,
2311 if( self->pFileState ==
Error )
return self->pStatus;
2317 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
2318 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2319 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2328 memcpy( req->
fhandle, self->pFileHandle, 4 );
2332 for(
int i = 0; i < iovcnt; ++i )
2334 if(
iov[i].iov_len == 0 )
continue;
2335 size +=
iov[i].iov_len;
2337 (
char*)
iov[i].iov_base ) );
2343 wrtreq->
dlen = size;
2344 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2355 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2357 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2376 const std::string &value )
2379 if( name ==
"ReadRecovery" )
2381 if( value ==
"true" ) pDoRecoverRead =
true;
2382 else pDoRecoverRead =
false;
2385 else if( name ==
"WriteRecovery" )
2387 if( value ==
"true" ) pDoRecoverWrite =
true;
2388 else pDoRecoverWrite =
false;
2391 else if( name ==
"FollowRedirects" )
2393 if( value ==
"true" ) pFollowRedirects =
true;
2394 else pFollowRedirects =
false;
2397 else if( name ==
"BundledClose" )
2399 if( value ==
"true" ) pAllowBundledClose =
true;
2400 else pAllowBundledClose =
false;
2410 std::string &value )
const
2413 if( name ==
"ReadRecovery" )
2415 if( pDoRecoverRead ) value =
"true";
2416 else value =
"false";
2419 else if( name ==
"WriteRecovery" )
2421 if( pDoRecoverWrite ) value =
"true";
2422 else value =
"false";
2425 else if( name ==
"FollowRedirects" )
2427 if( pFollowRedirects ) value =
"true";
2428 else value =
"false";
2431 else if( name ==
"DataServer" && pDataServer )
2432 { value = pDataServer->GetHostId();
return true; }
2433 else if( name ==
"LastURL" && pDataServer )
2434 { value = pDataServer->GetURL();
return true; }
2435 else if( name ==
"WrtRecoveryRedir" && pWrtRecoveryRedir )
2436 { value = pWrtRecoveryRedir->GetHostId();
return true; }
2454 std::string lastServer = pFileUrl->GetHostId();
2458 delete pLoadBalancer;
2460 delete pWrtRecoveryRedir;
2461 pWrtRecoveryRedir = 0;
2463 pDataServer =
new URL( hostList->back().url );
2464 pDataServer->SetParams( pFileUrl->GetParams() );
2465 if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2466 lastServer = pDataServer->GetHostId();
2467 HostList::const_iterator itC;
2469 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2472 itC->url.GetParams(),
2475 pDataServer->SetParams( params );
2477 HostList::const_reverse_iterator it;
2478 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2479 if( it->loadBalancer )
2481 pLoadBalancer =
new URL( it->url );
2485 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2488 pWrtRecoveryRedir =
new URL( it->url );
2493 log->
Debug(
FileMsg,
"[%p@%s] Open has returned with status %s",
2494 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), status->
ToStr().c_str() );
2496 if( pDataServer && !pDataServer->IsLocalFile() )
2507 isencobj.
Get( isenc );
2508 pIsChannelEncrypted = isenc ? *isenc :
false;
2517 if( !pStatus.IsOK() || !openInfo )
2519 log->
Debug(
FileMsg,
"[%p@%s] Error while opening at %s: %s",
2520 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2521 pStatus.ToStr().c_str() );
2522 FailQueuedMessages( pStatus );
2559 log->
Debug(
FileMsg,
"[%p@%s] successfully opened at %s, handle: %#x, "
2560 "session id: %llu", (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
2561 pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2562 (
unsigned long long) pSessionId );
2567 gettimeofday( &pOpenTime, 0 );
2576 i.
fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2583 ReSendQueuedMessages();
2596 log->
Debug(
FileMsg,
"[%p@%s] Close returned from %s with: %s", (
void*)
this,
2597 pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2598 status->
ToStr().c_str() );
2600 log->
Dump(
FileMsg,
"[%p@%s] Items in the fly %zu, queued for recovery %zu",
2601 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2603 MonitorClose( status );
2604 ResetMonitoringVars();
2624 static const std::string root =
"root", xroot =
"xroot", file =
"file",
2625 roots =
"roots", xroots =
"xroots";
2627 if( !msg.compare( 0, root.size(), root ) ||
2628 !msg.compare( 0, xroot.size(), xroot ) ||
2629 !msg.compare( 0, file.size(), file ) ||
2630 !msg.compare( 0, roots.size(), roots ) ||
2631 !msg.compare( 0, xroots.size(), xroots ) )
2643 self->pInTheFly.erase( message );
2645 log->
Dump(
FileMsg,
"[%p@%s] File state error encountered. Message %s "
2646 "returned with %s", (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2656 i.
file = self->pFileUrl;
2678 if( !self->IsRecoverable( *status ) || sendParams.
kbuff )
2680 log->
Error(
FileMsg,
"[%p@%s] Fatal file state error. Message %s "
2681 "returned with %s", (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2684 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2693 self->pCloseReason = *status;
2694 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2702 const std::string &redirectUrl,
2708 self->pInTheFly.erase( message );
2714 if( !self->pStateRedirect )
2716 std::ostringstream o;
2717 self->pStateRedirect =
new URL( redirectUrl );
2720 self->pStateRedirect->GetParams(),
2722 self->pFileUrl->SetParams( params );
2725 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2740 log->
Dump(
FileMsg,
"[%p@%s] Got state response for message %s",
2741 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2748 self->pInTheFly.erase( message );
2749 RunRecovery( self );
2764 response->
Get( info );
2765 delete self->pStatInfo;
2766 self->pStatInfo =
new StatInfo( *info );
2798 for(
size_t i = 0; i < segs; ++i )
2799 self->pVRBytes += dataChunk[i].
rlen;
2800 self->pVSegs += segs;
2833 for(
size_t i = 0; i < size; ++i )
2834 self->pVWBytes += wrtList[i].
wlen;
2845 if (pMutex.CondLock())
2856 if( !pToBeRecovered.empty() )
2859 log->
Dump(
FileMsg,
"[%p@%s] Got a timer event", (
void*)
this,
2860 pFileUrl->GetObfuscatedURL().c_str() );
2861 RequestList::iterator it;
2863 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2865 if( it->params.expires <= now )
2870 0, it->params.hostList ) );
2871 it = pToBeRecovered.erase( it );
2889 if( (IsReadOnly() && pDoRecoverRead) ||
2890 (!IsReadOnly() && pDoRecoverWrite) )
2892 log->
Debug(
FileMsg,
"[%p@%s] Putting the file in recovery state in "
2893 "process %d", (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2896 pToBeRecovered.clear();
2909 if( self->pFileState !=
Opened || !self->pLoadBalancer )
2915 log->
Debug(
FileMsg,
"[%p@%s] Reopen file at next data server.",
2916 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2919 auto lbcgi = self->pLoadBalancer->GetParams();
2920 auto dtcgi = self->pDataServer->GetParams();
2923 auto itr = lbcgi.find(
"tried" );
2924 if( itr == lbcgi.end() )
2925 lbcgi[
"tried"] = self->pDataServer->GetHostName();
2928 std::string tried = itr->second;
2929 tried +=
"," + self->pDataServer->GetHostName();
2930 lbcgi[
"tried"] = tried;
2932 self->pLoadBalancer->SetParams( lbcgi );
2934 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2940 template<
typename T>
2941 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2944 const std::vector<T> &attrs,
2959 memcpy( req->
fhandle, self->pFileHandle, 4 );
2961 if( !st.
IsOK() )
return st;
2970 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2972 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2978 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2989 return RecoverMessage( self, RequestData( msg, handler, sendParams ),
false );
2995 if( self->pFileState ==
Opened )
2998 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
3005 return RecoverMessage( self, RequestData( msg, handler, sendParams ),
false );
3008 self->pInTheFly.insert(msg);
3019 bool FileStateHandler::IsRecoverable(
const XRootDStatus &status )
const
3021 const auto recoverable_errors = {
3030 if (pDoRecoverRead || pDoRecoverWrite)
3031 for (
const auto error : recoverable_errors)
3032 if (status.
code == error)
3033 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
3041 bool FileStateHandler::IsReadOnly()
const
3054 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
3056 bool callbackOnFailure )
3061 log->
Dump(
FileMsg,
"[%p@%s] Putting message %s in the recovery list",
3062 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3063 rd.request->GetObfuscatedDescription().c_str() );
3065 Status st = RunRecovery( self );
3068 self->pToBeRecovered.push_back( rd );
3072 if( callbackOnFailure )
3073 self->FailMessage( rd, st );
3081 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
3086 if( !self->pInTheFly.empty() )
3090 log->
Debug(
FileMsg,
"[%p@%s] Running the recovery procedure", (
void*)self.get(),
3091 self->pFileUrl->GetObfuscatedURL().c_str() );
3094 if( self->pStateRedirect )
3096 SendClose( self, 0 );
3097 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
3098 delete self->pStateRedirect; self->pStateRedirect = 0;
3100 else if( self->IsReadOnly() && self->pLoadBalancer )
3101 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
3103 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
3107 self->pFileState =
Error;
3109 self->FailQueuedMessages( st );
3118 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
3122 ClientCloseRequest *req;
3126 memcpy( req->
fhandle, self->pFileHandle, 4 );
3131 [self]( XRootDStatus&, AnyObject& )
mutable { self.reset(); } );
3132 MessageSendParams params;
3134 params.followRedirects =
false;
3135 params.stateful =
true;
3139 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3145 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3150 log->
Dump(
FileMsg,
"[%p@%s] Sending a recovery open command to %s",
3151 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.
GetObfuscatedURL().c_str() );
3160 self->pOpenFlags &= ~OpenFlags::Delete;
3164 self->pOpenFlags &= ~OpenFlags::New;
3167 ClientOpenRequest *req;
3171 u.
SetPath( self->pFileUrl->GetPath() );
3177 req->
mode = self->pOpenMode;
3178 req->
options = (self->pOpenFlags & 0xffff);
3179 req->
dlen = path.length();
3181 XRootDStatus st = FillFhTempl( self, url, msg, sendUrl );
3185 self->pFileState =
Closed;
3188 msg->
Append( path.c_str(), path.length(), 24 );
3194 MessageSendParams params; params.
timeout = timeout;
3201 st = self->IssueRequest( sendUrl, msg, openHandler, params );
3208 self->pFileState =
Closed;
3216 void FileStateHandler::FailMessage( RequestData rd,
XRootDStatus status )
3219 log->
Dump(
FileMsg,
"[%p@%s] Failing message %s with %s",
3220 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
3221 rd.request->GetObfuscatedDescription().c_str(),
3222 status.
ToStr().c_str() );
3224 StatefulHandler *sh =
dynamic_cast<StatefulHandler*
>(rd.handler);
3228 log->
Error(
FileMsg,
"[%p@%s] Internal error while recovering %s",
3229 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
3230 rd.request->GetObfuscatedDescription().c_str() );
3235 ResponseHandler *userHandler = sh->GetUserHandler();
3238 new XRootDStatus( status ),
3239 0, rd.params.hostList ) );
3247 void FileStateHandler::FailQueuedMessages(
XRootDStatus status )
3249 RequestList::iterator it;
3250 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3251 FailMessage( *it, status );
3252 pToBeRecovered.clear();
3258 void FileStateHandler::ReSendQueuedMessages()
3260 RequestList::iterator it;
3261 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3263 it->request->SetSessionId( pSessionId );
3264 ReWriteFileHandle( it->request );
3265 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3266 it->handler, it->params );
3268 FailMessage( *it, st );
3270 pToBeRecovered.clear();
3276 void FileStateHandler::ReWriteFileHandle(
Message *msg )
3278 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->
GetBuffer();
3283 ClientReadRequest *req = (ClientReadRequest*)msg->
GetBuffer();
3284 memcpy( req->
fhandle, pFileHandle, 4 );
3289 ClientWriteRequest *req = (ClientWriteRequest*)msg->
GetBuffer();
3290 memcpy( req->
fhandle, pFileHandle, 4 );
3295 ClientSyncRequest *req = (ClientSyncRequest*)msg->
GetBuffer();
3296 memcpy( req->
fhandle, pFileHandle, 4 );
3301 ClientTruncateRequest *req = (ClientTruncateRequest*)msg->
GetBuffer();
3302 memcpy( req->
fhandle, pFileHandle, 4 );
3307 ClientReadVRequest *req = (ClientReadVRequest*)msg->
GetBuffer();
3308 readahead_list *dataChunk = (readahead_list*)msg->
GetBuffer( 24 );
3309 for(
size_t i = 0; i < req->
dlen/
sizeof(readahead_list); ++i )
3310 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3315 ClientWriteVRequest *req =
3316 reinterpret_cast<ClientWriteVRequest*
>( msg->
GetBuffer() );
3317 XrdProto::write_list *wrtList =
3318 reinterpret_cast<XrdProto::write_list*
>( msg->
GetBuffer( 24 ) );
3319 size_t size = req->
dlen /
sizeof(XrdProto::write_list);
3320 for(
size_t i = 0; i < size; ++i )
3321 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3326 ClientPgReadRequest *req = (ClientPgReadRequest*) msg->
GetBuffer();
3327 memcpy( req->
fhandle, pFileHandle, 4 );
3332 ClientPgWriteRequest *req = (ClientPgWriteRequest*) msg->
GetBuffer();
3333 memcpy( req->
fhandle, pFileHandle, 4 );
3339 log->
Dump(
FileMsg,
"[%p@%s] Rewritten file handle for %s to %#x",
3341 *((uint32_t*)pFileHandle) );
3348 void FileStateHandler::MonitorClose(
const XRootDStatus *status )
3353 Monitor::CloseInfo i;
3356 gettimeofday( &i.
cTOD, 0 );
3378 sendParams, pLFileHandler );
3382 return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3386 sendParams, pLFileHandler );
3392 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3395 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3402 XrdSysMutexHelper scopedLock( self->pMutex );
3408 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
3409 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3410 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3413 ClientWriteRequest *req;
3419 memcpy( req->
fhandle, self->pFileHandle, 4 );
3421 MessageSendParams params;
3425 params.
kbuff = kbuff.release();
3431 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
3433 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3441 std::shared_ptr<FileStateHandler> &self,
3444 ClientOpenRequest *req = (ClientOpenRequest*)msg->
GetBuffer();
3447 if( !self->NeedFileTempl() )
3450 return XRootDStatus();
3453 using wp = std::weak_ptr<FileStateHandler>;
3454 if( !self->pTemplateFileWp.owner_before(wp{}) &&
3455 !wp{}.owner_before(self->pTemplateFileWp) )
3459 "File flags required a template file" );
3468 std::shared_ptr<FileStateHandler> tfp = self->pTemplateFileWp.lock();
3471 "Template file object does not exist" );
3473 XrdSysMutexHelper scopedLock( tfp->pMutex );
3475 if( tfp->pFileState !=
Opened )
3477 "Template file not open" );
3479 if (!tfp->pDataServer || !tfp->pFileHandle)
3481 "Template file not connected" );
3483 sendUrl.
SetHostPort( tfp->pDataServer->GetHostName(),tfp->pDataServer->GetPort() );
3484 sendUrl.
SetUserName( tfp->pDataServer->GetUserName() );
3491 return XRootDStatus();
3504 if( self->pFileState ==
Error )
return self->pStatus;
3513 log->
Debug(
FileMsg,
"[%p@%s] Sending a clone command for handle %#x to %s",
3514 self.get(), self->pFileUrl->GetURL().c_str(),
3515 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3526 memcpy( req->
fhandle, self->pFileHandle, 4 );
3534 "Template file not available" );
3539 "Template file invalid" );
3544 "Template file object does not exist" );
3547 if( tfp->pFileState !=
Opened )
3549 "Template file not open" );
3551 if( tfp->pSessionId != self->pSessionId )
3553 "Clone source not at same location as destination" );
3555 memcpy( cl[idx].srcFH, tfp->pFileHandle, 4 );
3556 cl[idx].
srcOffs = loc.srcOffs;
3557 cl[idx].
srcLen = loc.srcLen;
3558 cl[idx].
dstOffs = loc.dstOffs;
3568 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
3570 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
struct ClientPgReadRequest pgread
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientRequestHdr header
struct ClientReadRequest read
#define kXR_PROTPGRWVERSION
struct ClientWriteRequest write
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, time_t timeout)
bool GetInt(const std::string &key, int &value)
An interface for file plug-ins.
std::weak_ptr< FileStateHandler > pTemplateFileWp
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, time_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, time_t timeout)
Try other data server.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Clone(std::shared_ptr< FileStateHandler > &self, const CloneLocations &locs, ResponseHandler *handler, time_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
friend class ::PgReadRetryHandler
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
void Tick(time_t now)
Tick.
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, time_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, time_t timeout=0)
FileStateHandler(FilePlugIn *&plugin)
Constructor.
friend class ::PgReadHandler
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus OpenUsingTemplate(std::shared_ptr< FileStateHandler > &self, ExportedFileTemplate *templ, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PreRead(std::shared_ptr< FileStateHandler > &self, const TractList &tracts, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
~FileStateHandler()
Destructor.
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, time_t timeout=0)
friend class ::PgReadSubstitutionHandler
bool IsOpen() const
Check if the file is open.
friend class ::OpenHandler
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
const std::string & GetPath() const
Get the path.
bool IsMetalink() const
Is it a URL to a metalink.
std::map< std::string, std::string > ParamsMap
void SetHostPort(const std::string &hostName, int port)
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
void SetPath(const std::string &path)
Set the path.
void SetUserName(const std::string &userName)
Set the username.
static bool HasKSameFS(const XrdCl::URL &url)
Check if given server supports kXR_clone and kXR_samefs.
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
std::vector< TractInfo > TractList
List of Tracts.
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errNotSupported
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
const uint16_t errRedirect
const uint16_t errSocketDisconnected
none object for initializing empty Optional
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< CloneLocation > locations
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
uint16_t oFlags2
OpenFlags upper 16 bits.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
Open flags, may be or'd when appropriate.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Samefs
Open file on the same filesystem as another.
@ Update
Open for reading and writing.
@ Dup
Open file duplicating content from another.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
Code
XRootD query request codes.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted