XRootD
Loading...
Searching...
No Matches
XrdClCopy.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdApps/XrdCpFile.hh"
30#include "XrdCl/XrdClLog.hh"
32#include "XrdCl/XrdClUtils.hh"
33#include "XrdCl/XrdClDlgEnv.hh"
34#include "XrdSys/XrdSysE2T.hh"
36
37#include <cstdio>
38#include <iostream>
39#include <iomanip>
40#include <limits>
41
42//------------------------------------------------------------------------------
43// Progress notifier
44//------------------------------------------------------------------------------
46{
47 public:
48 //--------------------------------------------------------------------------
50 //--------------------------------------------------------------------------
51 ProgressDisplay(): pPrevious(0), pPrintProgressBar(true),
52 pPrintSourceCheckSum(false), pPrintTargetCheckSum(false),
53 pPrintAdditionalCheckSum(false)
54 {}
55
56 //--------------------------------------------------------------------------
58 //--------------------------------------------------------------------------
59 virtual void BeginJob( uint16_t jobNum,
60 uint16_t jobTotal,
61 const XrdCl::URL *source,
62 const XrdCl::URL *destination )
63 {
64 XrdSysMutexHelper scopedLock( pMutex );
65 if( pPrintProgressBar )
66 {
67 if( jobTotal > 1 )
68 {
69 std::cerr << "Job: " << jobNum << "/" << jobTotal << std::endl;
70 std::cerr << "Source: " << source->GetURL() << std::endl;
71 std::cerr << "Target: " << destination->GetURL() << std::endl;
72 }
73 }
74 pPrevious = 0;
75
76 JobData d;
77 d.started = time(0);
78 d.source = source;
79 d.target = destination;
80 pOngoingJobs[jobNum] = d;
81 }
82
83 //--------------------------------------------------------------------------
85 //--------------------------------------------------------------------------
86 virtual void EndJob( uint16_t jobNum, const XrdCl::PropertyList *results )
87 {
88 XrdSysMutexHelper scopedLock( pMutex );
89
90 std::map<uint16_t, JobData>::iterator it = pOngoingJobs.find( jobNum );
91 if( it == pOngoingJobs.end() )
92 return;
93
94 JobData &d = it->second;
95
96 // make sure the last available status was printed, which may not be
97 // the case when processing stdio since we throttle printing and don't
98 // know the total size
99 JobProgress( jobNum, d.bytesProcessed, d.bytesTotal );
100
101 if( pPrintProgressBar )
102 {
103 if( pOngoingJobs.size() > 1 )
104 std::cerr << "\r" << std::string(70, ' ') << "\r";
105 else
106 std::cerr << std::endl;
107 }
108
110 results->Get( "status", st );
111 if( !st.IsOK() )
112 {
113 pOngoingJobs.erase(it);
114 return;
115 }
116
117 std::string checkSum;
118 uint64_t size;
119 results->Get( "size", size );
120 if( pPrintSourceCheckSum )
121 {
122 results->Get( "sourceCheckSum", checkSum );
123 PrintCheckSum( d.source, checkSum, size );
124 }
125
126 if( pPrintTargetCheckSum )
127 {
128 results->Get( "targetCheckSum", checkSum );
129 PrintCheckSum( d.target, checkSum, size );
130 }
131
132 if( pPrintAdditionalCheckSum )
133 {
134 std::vector<std::string> addcksums;
135 results->Get( "additionalCkeckSum", addcksums );
136 for( auto &cks : addcksums )
137 PrintCheckSum( d.source, cks, size );
138 }
139
140 pOngoingJobs.erase(it);
141 }
142
143 //--------------------------------------------------------------------------
145 //--------------------------------------------------------------------------
146 std::string GetProgressBar( time_t now )
147 {
148 JobData &d = pOngoingJobs.begin()->second;
149
150 uint64_t speed = 0;
151 if( now-d.started )
152 speed = d.bytesProcessed/(now-d.started);
153 else
154 speed = d.bytesProcessed;
155
156 std::string bar;
157 int prog = 0;
158 int proc = 0;
159
160 if( d.bytesTotal )
161 {
162 prog = (int)((double)d.bytesProcessed/d.bytesTotal*50);
163 proc = (int)((double)d.bytesProcessed/d.bytesTotal*100);
164 }
165 else
166 {
167 prog = 50;
168 proc = 100;
169 }
170 bar.append( prog, '=' );
171 if( prog < 50 )
172 bar += ">";
173
174 std::ostringstream o;
175 o << "[" << XrdCl::Utils::BytesToString(d.bytesProcessed) << "B/";
176 o << XrdCl::Utils::BytesToString(d.bytesTotal) << "B]";
177 o << "[" << std::setw(3) << std::right << proc << "%]";
178 o << "[" << std::setw(50) << std::left;
179 o << bar;
180 o << "]";
181 o << "[" << XrdCl::Utils::BytesToString(speed) << "B/s] ";
182 return o.str();
183 }
184
185 //--------------------------------------------------------------------------
187 //--------------------------------------------------------------------------
188 std::string GetSummaryBar( time_t now )
189 {
190 std::map<uint16_t, JobData>::iterator it;
191 std::ostringstream o;
192
193 for( it = pOngoingJobs.begin(); it != pOngoingJobs.end(); ++it )
194 {
195 JobData &d = it->second;
196 uint16_t jobNum = it->first;
197
198 uint64_t speed = 0;
199 if( now-d.started )
200 speed = d.bytesProcessed/(now-d.started);
201
202 int proc = 0;
203 if( d.bytesTotal )
204 proc = (int)((double)d.bytesProcessed/d.bytesTotal*100);
205 else
206 proc = 100;
207
208 o << "[#" << jobNum << ": ";
209 o << proc << "% ";
210 o << XrdCl::Utils::BytesToString(speed) << "B/s] ";
211 }
212 o << " ";
213 return o.str();
214 }
215
216 //--------------------------------------------------------------------------
218 //--------------------------------------------------------------------------
219 virtual void JobProgress( uint16_t jobNum,
220 uint64_t bytesProcessed,
221 uint64_t bytesTotal )
222 {
223 XrdSysMutexHelper scopedLock( pMutex );
224
225 if( pPrintProgressBar )
226 {
227 time_t now = time(0);
228 if( (now - pPrevious < 1) && (bytesProcessed != bytesTotal) )
229 return;
230 pPrevious = now;
231
232 std::map<uint16_t, JobData>::iterator it = pOngoingJobs.find( jobNum );
233 if( it == pOngoingJobs.end() )
234 return;
235
236 JobData &d = it->second;
237
238 d.bytesProcessed = bytesProcessed;
239 d.bytesTotal = bytesTotal;
240
241 std::string progress;
242 if( pOngoingJobs.size() == 1 )
243 progress = GetProgressBar( now );
244 else
245 progress = GetSummaryBar( now );
246
247 std::cerr << "\r" << progress << std::flush;
248 }
249 }
250
251 //--------------------------------------------------------------------------
253 //--------------------------------------------------------------------------
254 void PrintCheckSum( const XrdCl::URL *url,
255 const std::string &checkSum,
256 uint64_t size )
257 {
258 if( checkSum.empty() )
259 return;
260 std::string::size_type i = checkSum.find( ':' );
261 std::cerr << checkSum.substr( 0, i+1 ) << " ";
262 std::cerr << checkSum.substr( i+1, checkSum.length()-i ) << " ";
263
264 if( url->IsLocalFile() )
265 std::cerr << url->GetPath() << " ";
266 else
267 {
268 std::cerr << url->GetProtocol() << "://" << url->GetHostId();
269 std::cerr << url->GetPath() << " ";
270 }
271
272 std::cerr << size;
273 std::cerr << std::endl;
274 }
275
276 //--------------------------------------------------------------------------
277 // Printing flags
278 //--------------------------------------------------------------------------
279 void PrintProgressBar( bool print ) { pPrintProgressBar = print; }
280 void PrintSourceCheckSum( bool print ) { pPrintSourceCheckSum = print; }
281 void PrintTargetCheckSum( bool print ) { pPrintTargetCheckSum = print; }
282 void PrintAdditionalCheckSum( bool print ) { pPrintAdditionalCheckSum = print; }
283
284 private:
285 struct JobData
286 {
287 JobData(): bytesProcessed(0), bytesTotal(0),
288 started(0), source(0), target(0) {}
289 uint64_t bytesProcessed;
290 uint64_t bytesTotal;
291 time_t started;
292 const XrdCl::URL *source;
293 const XrdCl::URL *target;
294 };
295
296 time_t pPrevious;
297 bool pPrintProgressBar;
298 bool pPrintSourceCheckSum;
299 bool pPrintTargetCheckSum;
300 bool pPrintAdditionalCheckSum;
301 std::map<uint16_t, JobData> pOngoingJobs;
302 XrdSysRecMutex pMutex;
303};
304
305//------------------------------------------------------------------------------
306// Check if we support all the specified user options
307//------------------------------------------------------------------------------
309{
310 if( config->pHost )
311 {
312 std::cerr << "SOCKS Proxies are not yet supported" << std::endl;
313 return false;
314 }
315
316 return true;
317}
318
319//------------------------------------------------------------------------------
320// Append extra cgi info to existing URL
321//------------------------------------------------------------------------------
322void AppendCGI( std::string &url, const char *newCGI )
323{
324 if( !newCGI || !(*newCGI) )
325 return;
326
327 if( *newCGI == '&' )
328 ++newCGI;
329
330 if( url.find( '?' ) == std::string::npos )
331 url += "?";
332
333 if( url.find( '&' ) == std::string::npos )
334 url += "&";
335
336 url += newCGI;
337}
338
339//------------------------------------------------------------------------------
340// Process commandline environment settings
341//------------------------------------------------------------------------------
343{
345
346 XrdCpConfig::defVar *cursor = config->intDefs;
347 while( cursor )
348 {
349 env->PutInt( cursor->vName, cursor->intVal );
350 cursor = cursor->Next;
351 }
352
353 cursor = config->strDefs;
354 while( cursor )
355 {
356 env->PutString( cursor->vName, cursor->strVal );
357 cursor = cursor->Next;
358 }
359}
360
361//------------------------------------------------------------------------------
362// Translate file type to a string for diagnostics purposes
363//------------------------------------------------------------------------------
365{
366 switch( type )
367 {
368 case XrdCpFile::isDir: return "directory";
369 case XrdCpFile::isFile: return "local file";
370 case XrdCpFile::isXroot: return "xroot";
371 case XrdCpFile::isHttp: return "http";
372 case XrdCpFile::isHttps: return "https";
373 case XrdCpFile::isStdIO: return "stdio";
374 default: return "other";
375 };
376}
377
378//------------------------------------------------------------------------------
379// Count the sources
380//------------------------------------------------------------------------------
381uint32_t CountSources( XrdCpFile *file )
382{
383 uint32_t count;
384 for( count = 0; file; file = file->Next, ++count ) {};
385 return count;
386}
387
388//------------------------------------------------------------------------------
389// Adjust file information for the cases when XrdCpConfig cannot do this
390//------------------------------------------------------------------------------
392{
393 //----------------------------------------------------------------------------
394 // If the file is url and the directory offset is not set we set it
395 // to the last slash
396 //----------------------------------------------------------------------------
397 if( file->Doff == 0 )
398 {
399 char *slash = file->Path;
400 for( ; *slash; ++slash ) {};
401 for( ; *slash != '/' && slash > file->Path; --slash ) {};
402 file->Doff = slash - file->Path;
403 }
404};
405
406//------------------------------------------------------------------------------
407// Recursively index all files and directories inside a remote directory
408//------------------------------------------------------------------------------
410 std::string basePath,
411 long dirOffset )
412{
413 using namespace XrdCl;
414
415 Log *log = DefaultEnv::GetLog();
416 log->Debug( AppMsg, "Indexing %s", basePath.c_str() );
417
418 DirectoryList *dirList = 0;
419 XRootDStatus st = fs->DirList( URL( basePath ).GetPath(), DirListFlags::Recursive
420 | DirListFlags::Locate | DirListFlags::Merge, dirList );
421 if( !st.IsOK() )
422 {
423 log->Info( AppMsg, "Failed to get directory listing for %s: %s",
424 basePath.c_str(),
425 st.GetErrorMessage().c_str() );
426 return 0;
427 }
428
429 XrdCpFile start, *current = 0;
430 XrdCpFile *end = &start;
431 int badUrl = 0;
432 for( auto itr = dirList->Begin(); itr != dirList->End(); ++itr )
433 {
434 DirectoryList::ListEntry *e = *itr;
435 if( e->GetStatInfo()->TestFlags( StatInfo::IsDir ) )
436 continue;
437 std::string path = basePath + '/' + e->GetName();
438 current = new XrdCpFile( path.c_str(), badUrl );
439 if( badUrl )
440 {
441 log->Error( AppMsg, "Bad URL: %s", current->Path );
442 delete current;
443 return 0;
444 }
445
446 current->Doff = dirOffset;
447 end->Next = current;
448 end = current;
449 }
450
451 delete dirList;
452
453 return start.Next;
454}
455
456//------------------------------------------------------------------------------
457// Clean up the copy job descriptors
458//------------------------------------------------------------------------------
459void CleanUpResults( std::vector<XrdCl::PropertyList *> &results )
460{
461 std::vector<XrdCl::PropertyList *>::iterator it;
462 for( it = results.begin(); it != results.end(); ++it )
463 delete *it;
464}
465
466//--------------------------------------------------------------------------
467// Let the show begin
468//------------------------------------------------------------------------------
469int main( int argc, char **argv )
470{
471 using namespace XrdCl;
472
473 //----------------------------------------------------------------------------
474 // Configure the copy command, if it returns then everything went well, ugly
475 //----------------------------------------------------------------------------
476 XrdCpConfig config( argv[0] );
477 config.Config( argc, argv, XrdCpConfig::optRmtRec );
478 if( !AllOptionsSupported( &config ) )
479 return 50; // generic error
480 ProcessCommandLineEnv( &config );
481
482 //----------------------------------------------------------------------------
483 // Set options
484 //----------------------------------------------------------------------------
485 CopyProcess process;
486 Log *log = DefaultEnv::GetLog();
487 if( config.Dlvl )
488 {
489 if( config.Dlvl == 1 ) log->SetLevel( Log::InfoMsg );
490 else if( config.Dlvl == 2 ) log->SetLevel( Log::DebugMsg );
491 else if( config.Dlvl == 3 ) log->SetLevel( Log::DumpMsg );
492 }
493
494 ProgressDisplay progress;
495 if( config.Want(XrdCpConfig::DoNoPbar) || !isatty( fileno( stdout ) ) )
496 progress.PrintProgressBar( false );
497
498 bool posc = false;
499 bool force = false;
500 bool coerce = false;
501 bool makedir = false;
502 bool dynSrc = false;
503 bool delegate = false;
504 bool preserveXAttr = false;
505 bool rmOnBadCksum = false;
506 bool continue_ = false;
507 bool recurse = false;
508 bool zipappend = false;
509 bool doserver = false;
510 std::string thirdParty = "none";
511
512 if( config.Want( XrdCpConfig::DoPosc ) ) posc = true;
513 if( config.Want( XrdCpConfig::DoForce ) ) force = true;
514 if( config.Want( XrdCpConfig::DoCoerce ) ) coerce = true;
515 if( config.Want( XrdCpConfig::DoTpc ) ) thirdParty = "first";
516 if( config.Want( XrdCpConfig::DoTpcOnly ) ) thirdParty = "only";
517 if( config.Want( XrdCpConfig::DoZipAppend ) ) zipappend = true;
518 if( config.Want( XrdCpConfig::DoServer ) ) doserver = true;
519 if( config.Want( XrdCpConfig::DoTpcDlgt ) )
520 {
521 // the env var is being set already here (we are issuing a stat
522 // inhere and we need the env var when we are establishing the
523 // connection and authenticating), but we are also setting a delegate
524 // parameter for CopyJob so it can be used on its own.
525 DlgEnv::Instance().Enable();
526 delegate = true;
527 }
528 else
529 DlgEnv::Instance().Disable();
530
531 if( config.Want( XrdCpConfig::DoRecurse ) )
532 {
533 makedir = true;
534 recurse = true;
535 }
536 if( config.Want( XrdCpConfig::DoPath ) ) makedir = true;
537 if( config.Want( XrdCpConfig::DoDynaSrc ) ) dynSrc = true;
538 if( config.Want( XrdCpConfig::DoXAttr ) ) preserveXAttr = true;
539 if( config.Want( XrdCpConfig::DoRmOnBadCksum ) ) rmOnBadCksum = true;
540 if( config.Want( XrdCpConfig::DoContinue ) ) continue_ = true;
541
542 if( force && continue_ )
543 {
544 std::cerr << "Invalid argument combination: continue + force." << std::endl;
545 return 50;
546 }
547
548 //----------------------------------------------------------------------------
549 // Checksums
550 //----------------------------------------------------------------------------
551 std::string checkSumType;
552 std::string checkSumPreset;
553 std::string checkSumMode = "none";
554 if( config.Want( XrdCpConfig::DoCksum ) )
555 {
556 checkSumMode = "end2end";
557 std::vector<std::string> ckSumParams;
558 Utils::splitString( ckSumParams, config.CksVal, ":" );
559 if( ckSumParams.size() > 1 )
560 {
561 if( ckSumParams[1] == "print" )
562 {
563 checkSumMode = "target";
564 progress.PrintTargetCheckSum( true );
565 }
566 else
567 checkSumPreset = ckSumParams[1];
568 }
569 checkSumType = ckSumParams[0];
570 }
571
572 if( config.Want( XrdCpConfig::DoCksrc ) )
573 {
574 checkSumMode = "source";
575 std::vector<std::string> ckSumParams;
576 Utils::splitString( ckSumParams, config.CksVal, ":" );
577 if( ckSumParams.size() == 2 )
578 {
579 checkSumMode = "source";
580 checkSumType = ckSumParams[0];
581 progress.PrintSourceCheckSum( true );
582 }
583 else
584 {
585 std::cerr << "Invalid parameter: " << config.CksVal << std::endl;
586 return 50; // generic error
587 }
588 }
589
590 if( !config.AddCksVal.empty() )
591 progress.PrintAdditionalCheckSum( true );
592
593 //----------------------------------------------------------------------------
594 // ZIP archive
595 //----------------------------------------------------------------------------
596 std::string zipFile;
597 bool zip = false;
598 if( config.Want( XrdCpConfig::DoZip ) )
599 {
600 zipFile = config.zipFile;
601 zip = true;
602 }
603
604 //----------------------------------------------------------------------------
605 // Extreme Copy
606 //----------------------------------------------------------------------------
607 int nbSources = 0;
608 bool xcp = false;
609 if( config.Want( XrdCpConfig::DoSources ) )
610 {
611 nbSources = config.nSrcs;
612 xcp = true;
613 }
614
615 //----------------------------------------------------------------------------
616 // Environment settings
617 //----------------------------------------------------------------------------
619
620 /* Stop PostMaster when exiting main() to ensure proper shutdown */
621 struct scope_exit {
622 ~scope_exit() { XrdCl::DefaultEnv::GetPostMaster()->Stop(); }
623 } stopPostMaster;
624
625 if( config.nStrm != 0 )
626 env->PutInt( "SubStreamsPerChannel", config.nStrm + 1 /*stands for the control stream*/ );
627
628 if( config.Retry != -1 )
629 {
630 env->PutInt( "CpRetry", config.Retry );
631 env->PutString( "CpRetryPolicy", config.RetryPolicy );
632 }
633
634 if( config.Want( XrdCpConfig::DoNoTlsOK ) )
635 env->PutInt( "NoTlsOK", 1 );
636
637 if( config.Want( XrdCpConfig::DoTlsNoData ) )
638 env->PutInt( "TlsNoData", 1 );
639
640 if( config.Want( XrdCpConfig::DoTlsMLF ) )
641 env->PutInt( "TlsMetalink", 1 );
642
643 if( config.Want( XrdCpConfig::DoZipMtlnCksum ) )
644 env->PutInt( "ZipMtlnCksum", 1 );
645
646 int chunkSize = DefaultCPChunkSize;
647 env->GetInt( "CPChunkSize", chunkSize );
648
649 int blockSize = DefaultXCpBlockSize;
650 env->GetInt( "XCpBlockSize", blockSize );
651
652 int parallelChunks = DefaultCPParallelChunks;
653 env->GetInt( "CPParallelChunks", parallelChunks );
654 if( parallelChunks < 1 ||
655 parallelChunks > std::numeric_limits<uint8_t>::max() )
656 {
657 std::cerr << "Can only handle between 1 and ";
658 std::cerr << (int)std::numeric_limits<uint8_t>::max();
659 std::cerr << " chunks in parallel. You asked for " << parallelChunks;
660 std::cerr << "." << std::endl;
661 return 50; // generic error
662 }
663
664 if( !preserveXAttr )
665 {
666 int val = DefaultPreserveXAttrs;
667 env->GetInt( "PreserveXAttrs", val );
668 if( val ) preserveXAttr = true;
669 }
670
671 log->Dump( AppMsg, "Chunk size: %d, parallel chunks %d, streams: %d",
672 chunkSize, parallelChunks, config.nStrm + 1 );
673
674 //----------------------------------------------------------------------------
675 // Build the URLs
676 //----------------------------------------------------------------------------
677 std::vector<XrdCl::PropertyList*> resultVect;
678
679 std::string dest;
680 if( config.dstFile->Protocol == XrdCpFile::isDir ||
682 {
683 dest = "file://";
684
685 // if it is not an absolute path append cwd
686 if( config.dstFile->Path[0] != '/' )
687 {
688 char buf[FILENAME_MAX];
689 char *cwd = getcwd( buf, FILENAME_MAX );
690 if( !cwd )
691 {
692 XRootDStatus st( stError, XProtocol::mapError( errno ), errno );
693 std::cerr << st.GetErrorMessage() << std::endl;
694 return st.GetShellCode();
695 }
696 dest += cwd;
697 dest += '/';
698 }
699 }
700 dest += config.dstFile->Path;
701
702 //----------------------------------------------------------------------------
703 // We need to check whether our target is a file or a directory:
704 // 1) it's a file, so we can accept only one source
705 // 2) it's a directory, so:
706 // * we can accept multiple sources
707 // * we need to append the source name
708 //----------------------------------------------------------------------------
709 bool targetIsDir = false;
710 bool targetExists = false;
711 if( config.dstFile->Protocol == XrdCpFile::isDir )
712 targetIsDir = true;
713 else if( config.dstFile->Protocol == XrdCpFile::isXroot ||
715 {
716 URL target( dest );
717 FileSystem fs( target );
718 StatInfo *statInfo = 0;
719 XRootDStatus st = fs.Stat( target.GetPathWithParams(), statInfo );
720 if( st.IsOK() )
721 {
722 if( statInfo->TestFlags( StatInfo::IsDir ) )
723 targetIsDir = true;
724 targetExists = true;
725 }
726 else if( st.errNo == kXR_NotFound && config.Want( XrdCpConfig::DoPath ) )
727 {
728 int n = strlen(config.dstFile->Path);
729 if( config.dstFile->Path[n-1] == '/' )
730 targetIsDir = true;
731 }
732 else if( st.errNo == kXR_NotAuthorized )
733 {
734 log->Error( AppMsg, "%s (destination)", st.ToString().c_str() );
735 std::cerr << st.ToStr() << std::endl;
736 return st.GetShellCode();
737 }
738
739 delete statInfo;
740 }
741
742 if( !targetIsDir && targetExists && !force && !recurse && !zipappend )
743 {
744 XRootDStatus st( stError, errInvalidOp, EEXIST );
745 // Unable to create /tmp/test.txt; file exists
746 log->Error( AppMsg, "%s (destination)", st.ToString().c_str() );
747 std::cerr << "Run: " << st.ToStr() << std::endl;
748 return st.GetShellCode();
749 }
750
751 //----------------------------------------------------------------------------
752 // If we have multiple sources and target is neither a directory nor stdout
753 // then we cannot proceed
754 //----------------------------------------------------------------------------
755 if( CountSources(config.srcFile) > 1 && !targetIsDir &&
757 {
758 std::cerr << "Multiple sources were given but target is not a directory.";
759 std::cerr << std::endl;
760 return 50; // generic error
761 }
762
763 //----------------------------------------------------------------------------
764 // If we're doing remote recursive copy, chain all the files (if it's a
765 // directory)
766 //----------------------------------------------------------------------------
767 bool remoteSrcIsDir = false;
768 if( config.Want( XrdCpConfig::DoRecurse ) &&
770 {
771 URL source( config.srcFile->Path );
772 FileSystem *fs = new FileSystem( source );
773 StatInfo *statInfo = 0;
774
775 XRootDStatus st = fs->Stat( source.GetPath(), statInfo );
776 if( st.IsOK() && statInfo->TestFlags( StatInfo::IsDir ) )
777 {
778 remoteSrcIsDir = true;
779 //------------------------------------------------------------------------
780 // Recursively index the remote directory
781 //------------------------------------------------------------------------
782 delete config.srcFile;
783 std::string url = source.GetURL();
784 config.srcFile = IndexRemote( fs, url, url.size() );
785 if ( !config.srcFile )
786 {
787 std::cerr << "Error indexing remote directory.";
788 return 50; // generic error
789 }
790 }
791
792 delete fs;
793 delete statInfo;
794 }
795
796 XrdCpFile *sourceFile = config.srcFile;
797 //----------------------------------------------------------------------------
798 // Process the sources
799 //----------------------------------------------------------------------------
800 while( sourceFile )
801 {
802 AdjustFileInfo( sourceFile );
803
804 //--------------------------------------------------------------------------
805 // Create a job for every source
806 //--------------------------------------------------------------------------
807 PropertyList properties;
808 PropertyList *results = new PropertyList;
809 std::string source = sourceFile->Path;
810 if( sourceFile->Protocol == XrdCpFile::isFile )
811 {
812 // make sure it is an absolute path
813 if( source[0] == '/' )
814 source = "file://" + source;
815 else
816 {
817 char buf[FILENAME_MAX];
818 char *cwd = getcwd( buf, FILENAME_MAX );
819 if( !cwd )
820 {
821 XRootDStatus st( stError, XProtocol::mapError( errno ), errno );
822 std::cerr << st.GetErrorMessage() << std::endl;
823 return st.GetShellCode();
824 }
825 source = "file://" + std::string( cwd ) + '/' + source;
826 }
827 }
828
829 AppendCGI( source, config.srcOpq );
830
831 log->Dump( AppMsg, "Processing source entry: %s, type %s, target file: %s",
832 sourceFile->Path, FileType2String( sourceFile->Protocol ),
833 dest.c_str() );
834
835 //--------------------------------------------------------------------------
836 // Set up the job
837 //--------------------------------------------------------------------------
838 std::string target = dest;
839
840
841 bool srcIsDir = false;
842 // if this is local file, for a directory Dlen + Doff will overlap with path size
843 if( strncmp( sourceFile->ProtName, "file", 4 ) == 0 )
844 srcIsDir = std::string( sourceFile->Path ).size() == size_t( sourceFile->Doff + sourceFile->Dlen );
845 // otherwise we are handling a remote file
846 else
847 srcIsDir = remoteSrcIsDir;
848 // if this is a recursive copy make sure we preserve the directory structure
849 if( config.Want( XrdCpConfig::DoRecurse ) && srcIsDir )
850 {
851 // get the source directory
852 std::string srcDir( sourceFile->Path, sourceFile->Doff );
853 // remove the trailing slash
854 if( srcDir[srcDir.size() - 1] == '/' )
855 srcDir = srcDir.substr( 0, srcDir.size() - 1 );
856 size_t diroff = srcDir.rfind( '/' );
857 // if there is no '/' it means a directory name has been given as relative path
858 if( diroff == std::string::npos ) diroff = 0;
859 target += '/';
860 target += sourceFile->Path + diroff;
861 // remove the filename from destination path as it will be appended later anyway
862 target = target.substr( 0 , target.rfind('/') );
863 }
864 AppendCGI( target, config.dstOpq );
865
866 properties.Set( "source", source );
867 properties.Set( "target", target );
868 properties.Set( "force", force );
869 properties.Set( "posc", posc );
870 properties.Set( "coerce", coerce );
871 properties.Set( "makeDir", makedir );
872 properties.Set( "dynamicSource", dynSrc );
873 properties.Set( "thirdParty", thirdParty );
874 properties.Set( "checkSumMode", checkSumMode );
875 properties.Set( "checkSumType", checkSumType );
876 properties.Set( "checkSumPreset", checkSumPreset );
877 properties.Set( "chunkSize", chunkSize );
878 properties.Set( "parallelChunks", parallelChunks );
879 properties.Set( "zipArchive", zip );
880 properties.Set( "xcp", xcp );
881 properties.Set( "xcpBlockSize", blockSize );
882 properties.Set( "delegate", delegate );
883 properties.Set( "targetIsDir", targetIsDir );
884 properties.Set( "preserveXAttr", preserveXAttr );
885 properties.Set( "xrate", config.xRate );
886 properties.Set( "xrateThreshold", config.xRateThreshold );
887 properties.Set( "rmOnBadCksum", rmOnBadCksum );
888 properties.Set( "continue", continue_ );
889 properties.Set( "zipAppend", zipappend );
890 properties.Set( "addcksums", config.AddCksVal );
891 properties.Set( "doServer", doserver );
892
893 if( zip )
894 properties.Set( "zipSource", zipFile );
895
896 if( xcp )
897 properties.Set( "nbXcpSources", nbSources );
898
899
900 XRootDStatus st = process.AddJob( properties, results );
901 if( !st.IsOK() )
902 {
903 std::cerr << "AddJob " << source << " -> " << target << ": ";
904 std::cerr << st.ToStr() << std::endl;
905 }
906 resultVect.push_back( results );
907 sourceFile = sourceFile->Next;
908 }
909
910 //----------------------------------------------------------------------------
911 // Configure the copy process
912 //----------------------------------------------------------------------------
913 PropertyList processConfig;
914 processConfig.Set( "jobType", "configuration" );
915 processConfig.Set( "parallel", config.Parallel );
916 process.AddJob( processConfig, 0 );
917
918 //----------------------------------------------------------------------------
919 // Prepare and run the copy process
920 //----------------------------------------------------------------------------
921 XRootDStatus st = process.Prepare();
922 if( !st.IsOK() )
923 {
924 CleanUpResults( resultVect );
925 std::cerr << "Prepare: " << st.ToStr() << std::endl;
926 return st.GetShellCode();
927 }
928
929 st = process.Run( &progress );
930 if( !st.IsOK() )
931 {
932 if( resultVect.size() == 1 )
933 std::cerr << "Run: " << st.ToStr() << std::endl;
934 else
935 {
936 std::vector<XrdCl::PropertyList*>::iterator it;
937 uint16_t i = 1;
938 uint16_t jobsRun = 0;
939 uint16_t errors = 0;
940 for( it = resultVect.begin(); it != resultVect.end(); ++it, ++i )
941 {
942 if( !(*it)->HasProperty( "status" ) )
943 continue;
944
945 XRootDStatus st = (*it)->Get<XRootDStatus>("status");
946 if( !st.IsOK() )
947 {
948 std::cerr << "Job #" << i << ": " << st.ToStr();
949 ++errors;
950 }
951 ++jobsRun;
952 }
953 std::cerr << "Jobs total: " << resultVect.size();
954 std::cerr << ", run: " << jobsRun;
955 std::cerr << ", errors: " << errors << std::endl;
956 }
957 CleanUpResults( resultVect );
958 return st.GetShellCode();
959 }
960 CleanUpResults( resultVect );
961 return 0;
962}
963
@ kXR_NotAuthorized
Definition XProtocol.hh:998
@ kXR_NotFound
Definition XProtocol.hh:999
bool AllOptionsSupported(XrdCpConfig *config)
Definition XrdClCopy.cc:308
int main(int argc, char **argv)
Definition XrdClCopy.cc:469
const char * FileType2String(XrdCpFile::PType type)
Definition XrdClCopy.cc:364
void ProcessCommandLineEnv(XrdCpConfig *config)
Definition XrdClCopy.cc:342
void CleanUpResults(std::vector< XrdCl::PropertyList * > &results)
Definition XrdClCopy.cc:459
XrdCpFile * IndexRemote(XrdCl::FileSystem *fs, std::string basePath, long dirOffset)
Definition XrdClCopy.cc:409
void AdjustFileInfo(XrdCpFile *file)
Definition XrdClCopy.cc:391
uint32_t CountSources(XrdCpFile *file)
Definition XrdClCopy.cc:381
void AppendCGI(std::string &url, const char *newCGI)
Definition XrdClCopy.cc:322
void PrintAdditionalCheckSum(bool print)
Definition XrdClCopy.cc:282
void PrintSourceCheckSum(bool print)
Definition XrdClCopy.cc:280
virtual void EndJob(uint16_t jobNum, const XrdCl::PropertyList *results)
End job.
Definition XrdClCopy.cc:86
void PrintProgressBar(bool print)
Definition XrdClCopy.cc:279
void PrintCheckSum(const XrdCl::URL *url, const std::string &checkSum, uint64_t size)
Print the checksum.
Definition XrdClCopy.cc:254
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
Job progress.
Definition XrdClCopy.cc:219
std::string GetProgressBar(time_t now)
Get progress bar.
Definition XrdClCopy.cc:146
virtual void BeginJob(uint16_t jobNum, uint16_t jobTotal, const XrdCl::URL *source, const XrdCl::URL *destination)
Begin job.
Definition XrdClCopy.cc:59
std::string GetSummaryBar(time_t now)
Get sumary bar.
Definition XrdClCopy.cc:188
ProgressDisplay()
Constructor.
Definition XrdClCopy.cc:51
void PrintTargetCheckSum(bool print)
Definition XrdClCopy.cc:281
static int mapError(int rc)
Copy the data from one point to another.
XRootDStatus Run(CopyProgressHandler *handler)
Run the copy jobs.
XRootDStatus AddJob(const PropertyList &properties, PropertyList *results)
Interface for copy progress notification.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
const std::string & GetName() const
Get file name.
StatInfo * GetStatInfo()
Get the stat info object.
Iterator End()
Get the end iterator.
Iterator Begin()
Get the begin iterator.
bool PutInt(const std::string &key, int value)
Definition XrdClEnv.cc:110
bool PutString(const std::string &key, const std::string &value)
Definition XrdClEnv.cc:52
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Send file/filesystem queries to an XRootD cluster.
XRootDStatus DirList(const std::string &path, DirListFlags::Flags flags, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus Stat(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Handle diagnostics.
Definition XrdClLog.hh:101
void SetLevel(LogLevel level)
Set the level of the messages that should be sent to the destination.
Definition XrdClLog.hh:173
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
bool Stop()
Stop the postmaster.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Object stat info.
bool TestFlags(uint32_t flags) const
Test flags.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:212
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
std::string GetPathWithParams() const
Get the path with params.
Definition XrdClURL.cc:304
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
bool IsLocalFile() const
Definition XrdClURL.cc:460
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:113
static std::string BytesToString(uint64_t bytes)
Convert bytes to a human readable string.
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
short Doff
Definition XrdCpFile.hh:46
PType Protocol
Definition XrdCpFile.hh:49
char * Path
Definition XrdCpFile.hh:45
char ProtName[8]
Definition XrdCpFile.hh:50
XrdCpFile * Next
Definition XrdCpFile.hh:44
short Dlen
Definition XrdCpFile.hh:47
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
uint32_t errNo
Errno, if any.
int GetShellCode() const
Get the status code that may be returned to the shell.
const char * vName
defVar * intDefs
void Config(int argc, char **argv, int Opts=0)
std::vector< std::string > AddCksVal
const char * dstOpq
static const uint64_t DoZipMtlnCksum
XrdCpFile * srcFile
XrdCpFile * dstFile
char * zipFile
static const uint64_t DoNoPbar
static const uint64_t DoCoerce
static const uint64_t DoForce
static const uint64_t DoRmOnBadCksum
static const uint64_t DoNoTlsOK
static const uint64_t DoTpc
static const uint64_t DoCksum
defVar * strDefs
static const uint64_t DoCksrc
static const uint64_t DoTpcDlgt
static const uint64_t DoZip
static const uint64_t DoContinue
const char * CksVal
static const uint64_t DoRecurse
const char * srcOpq
static const uint64_t DoZipAppend
static const uint64_t DoDynaSrc
int Want(uint64_t What)
long long xRate
static const uint64_t DoSources
static const uint64_t DoXAttr
static const uint64_t DoTlsMLF
static const int optRmtRec
std::string RetryPolicy
static const uint64_t DoPath
static const uint64_t DoPosc
long long xRateThreshold
static const uint64_t DoTpcOnly
static const uint64_t DoTlsNoData
static const uint64_t DoServer