XRootD
Loading...
Searching...
No Matches
XrdXrootdMonitor.cc File Reference
#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include "XrdVersion.hh"
#include "XrdNet/XrdNetMsg.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include "Xrd/XrdScheduler.hh"
#include "XrdXrootd/XrdXrootdMonitor.hh"
#include "XrdXrootd/XrdXrootdMonFile.hh"
#include "XrdXrootd/XrdXrootdTrace.hh"
+ Include dependency graph for XrdXrootdMonitor.cc:

Go to the source code of this file.

Classes

class  XrdXrootdMonitor_Ident
 
class  XrdXrootdMonitor_Tick
 
class  XrdXrootdMonitorLock
 

Namespaces

namespace  XrdXrootdMonInfo
 

Macros

#define setTMark(TM_mb, TM_en, TM_tm)
 
#define setTMurk(TM_mb, TM_en, TM_tm)
 

Functions

static int32_t XrdXrootdMonInfo::InitStartTime ()
 

Variables

int XrdXrootdMonInfo::kySIDSZ = 0
 
int XrdXrootdMonInfo::LidCGI [4] = {0}
 
int XrdXrootdMonInfo::LidJSON [4] = {0}
 
XrdSysMutex XrdXrootdMonInfo::seqMutex
 
XrdSysTrace XrdXrootdTrace
 

Macro Definition Documentation

◆ setTMark

#define setTMark (   TM_mb,
  TM_en,
  TM_tm 
)
Value:
TM_mb->info[TM_en].arg0.val = mySID; \
TM_mb->info[TM_en].arg0.id[0] = XROOTD_MON_WINDOW; \
TM_mb->info[TM_en].arg1.Window = \
TM_mb->info[TM_en].arg2.Window = static_cast<kXR_int32>(ntohl(TM_tm));
int kXR_int32
Definition XPtypes.hh:89
const kXR_char XROOTD_MON_WINDOW

Definition at line 130 of file XrdXrootdMonitor.cc.

146 : public XrdJob
147{
148public:
149
150void DoIt() {
151 if (idInt >= 0)
152 {if (doIdnt) XrdXrootdMonitor::Ident();
153 if (doHail) doHail = XrdXrootdMonitor::Hello::Hail();
154 }
155 if ((doIdnt || doHail) && idInt > 0)
156 Sched->Schedule((XrdJob *)this, time(0)+idInt);
157 }
158
159 XrdXrootdMonitor_Ident(int idt, bool ison) : XrdJob("monitor ident"),
160 idInt(idt), doIdnt(ison), doHail(true) {}
162
163private:
164int idInt;
165bool doIdnt;
166bool doHail;
167};
168
169/******************************************************************************/
170/* C l a s s X r d X r o o t d M o n i t o r _ T i c k */
171/******************************************************************************/
172
173class XrdXrootdMonitor_Tick : public XrdJob
174{
175public:
176
177void DoIt() {
178#ifndef NODEBUG
179 const char *TraceID = "MonTick";
180#endif
181 time_t Now = XrdXrootdMonitor::Tick();
182 if (Window && Now)
183 Sched->Schedule((XrdJob *)this, Now+Window);
184 else {TRACE(DEBUG, "Monitor clock stopping.");}
185 }
186
187void Set(XrdScheduler *sp, int intvl) {Sched = sp; Window = intvl;}
188
189 XrdXrootdMonitor_Tick() : XrdJob("monitor window clock"),
190 Sched(0), Window(0) {}
192
193private:
194XrdScheduler *Sched; // System scheduler
195int Window;
196};
197
198/******************************************************************************/
199/* C l a s s X r d X r o o t d M o n i t o r L o c k */
200/******************************************************************************/
201
203{
204public:
205
206static void Lock() {monLock.Lock();}
207
208static void UnLock() {monLock.UnLock();}
209
211 {if (theMonitor != XrdXrootdMonitor::altMon) unLock = 0;
212 else {unLock = 1; monLock.Lock();}
213 }
214 ~XrdXrootdMonitorLock() {if (unLock) monLock.UnLock();}
215
216private:
217
218static XrdSysMutex monLock;
219 char unLock;
220};
221
222XrdSysMutex XrdXrootdMonitorLock::monLock;
223
224/******************************************************************************/
225/* X r d X r o o t d M o n i t o r : : H e l l o */
226/******************************************************************************/
227
228XrdXrootdMonitor::Hello::Hello(const char *dest, char mode)
229 : Next(0), theDest(0), theMode(0)
230{
231 if (dest)
232 {Hello *nP = First;
233 while(nP) {if (!strcmp(dest, nP->theDest) && mode == theMode) return;
234 nP = nP->Next;
235 }
236 Next = First;
237 First = this;
238 theDest = strdup(dest);
239 theMode = mode;
240 }
241}
242
243/******************************************************************************/
244/* X r d X r o o t d M o n i t o r : : H e l l o : : H a i l */
245/******************************************************************************/
246
247XrdXrootdMonitor::Hello *XrdXrootdMonitor::Hello::First = 0;
248
250{
251 Hello *nP = First;
252
253// Call all the registered ident methods
254//
255 while(nP) {nP->Ident(); nP = nP->Next;}
256
257// Indicate whether or not anything would have been sent
258//
259 return First != 0;
260}
261
262/******************************************************************************/
263/* X r d X r o o t d M o n i t o r : : U s e r : : D i s a b l e */
264/******************************************************************************/
265
267{
268 if (Agent)
269 {XrdXrootdMonitor::unAlloc(Agent); Agent = 0;}
270 Fops = Iops = 0;
271}
272
273/******************************************************************************/
274/* X r d X r o o t d M o n i t o r : : U s e r : : E n a b l e */
275/******************************************************************************/
276
278{
279 if (Agent || (Agent = XrdXrootdMonitor::Alloc(1)))
280 {Iops = XrdXrootdMonitor::monIO;
281 Fops = XrdXrootdMonitor::monFILE;
282 } else Iops = Fops = 0;
283}
284
285/******************************************************************************/
286/* X r d X r o o t d M o n i t o r : : U s e r : : R e g i s t e r */
287/******************************************************************************/
288
289void XrdXrootdMonitor::User::Register(const char *Uname,
290 const char *Hname,
291 const char *Pname, unsigned int xSID)
292{
293#ifndef NODEBUG
294 const char *TraceID = "Monitor";
295#endif
296 char *dotP, *colonP, *atP;
297 char uBuff[1024], tBuff[1024], sBuff[64];
298
299// Decode the user name as a.b:c@d and remap it for monitoring as
300// <protocol>/a.{b|xSID}:<kySID>@host
301//
302 snprintf(tBuff, sizeof(tBuff), "%s", Uname);
303 if ((dotP = index(tBuff, '.')) && (colonP = index(dotP+1, ':')) &&
304 (atP = index(colonP+1, '@')))
305 {*dotP = 0; *colonP = 0; *atP = 0;
306 if (xSID)
307 {snprintf(sBuff, sizeof(sBuff), " %u", xSID);
308 dotP = sBuff;
309 }
310
311 int n = snprintf(uBuff, sizeof(uBuff), "%s/%s.%s:%s@%s", Pname, tBuff,
312 dotP+1, kySID, atP+1);
313
314 if (n < 0 || n >= (int) sizeof(uBuff))
315 TRACE(LOGIN, "Login ID was truncated: " << uBuff);
316
317 if (xSID) {TRACE(LOGIN,"Register remap "<<Uname<<" -> "<<uBuff);}
318 } else snprintf(uBuff, sizeof(uBuff), "%s/%s", Pname, Uname);
319
320// Generate a monitor identity for this user. We do not assign a dictioary
321// identifier unless this entry is reported.
322//
323 Agent = XrdXrootdMonitor::Alloc();
324 Did = 0;
325 Len = strlen(uBuff);
326 Name = strdup(uBuff);
327 Iops = XrdXrootdMonitor::monIO;
328 Fops = XrdXrootdMonitor::monFILE;
329}
330
331/******************************************************************************/
332/* R e p o r t */
333/******************************************************************************/
334
335void XrdXrootdMonitor::User::Report(int eCode, int aCode)
336{
337 char buff[1024];
338
339 snprintf(buff, sizeof(buff), "&Uc=%d&Ec=%d&Ac=%d", ntohl(Did), eCode, aCode);
340
341 XrdXrootdMonitor::Map(XROOTD_MON_MAPUEAC,*this,buff);
342}
343
344/******************************************************************************/
345
346bool XrdXrootdMonitor::User::Report(WhatInfo infoT, const char *info)
347{
348 char buff[4096];
349
350// Currently we support only the token external report
351//
352 if (infoT != TokenInfo) return false;
353
354 snprintf(buff, sizeof(buff), "&Uc=%d%s%s", ntohl(Did),
355 (*info == '&' ? "" : "&"), info);
356
357 XrdXrootdMonitor::Map(XROOTD_MON_MAPTOKN,*this,buff);
358
359 return true;
360}
361/******************************************************************************/
362/* C o n s t r u c t o r */
363/******************************************************************************/
364
366{
367 kXR_int32 localWindow;
368
369// Initialize last window to force a mark as well as the local window
370//
371 lastWindow = 0;
372 localWindow = currWindow;
373
374// Allocate a monitor buffer
375//
376 if (posix_memalign((void **)&monBuff, getpagesize(), monBlen))
377 eDest->Emsg("Monitor", "Unable to allocate monitor buffer.");
378 else {nextEnt = 1;
379 setTMark(monBuff, 0, localWindow);
380 }
381}
382
383/******************************************************************************/
384/* D e s t r u c t o r */
385/******************************************************************************/
386
387XrdXrootdMonitor::~XrdXrootdMonitor()
388{
389// Release buffer
390 if (monBuff) {Flush(); free(monBuff);}
391}
392
393/******************************************************************************/
394/* a p p I D */
395/******************************************************************************/
396
397void XrdXrootdMonitor::appID(char *id)
398{
399 static const int apInfoSize = sizeof(XrdXrootdMonTrace)-4;
400
401// Application ID's are only meaningful for io event recording
402//
403 if (this == altMon || !*id) return;
404
405// Fill out the monitor record
406//
407 if (lastWindow != currWindow) Mark();
408 else if (nextEnt == lastEnt) Flush();
409 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_APPID;
410 strncpy((char *)(&(monBuff->info[nextEnt])+4), id, apInfoSize);
411}
412
413/******************************************************************************/
414/* A l l o c */
415/******************************************************************************/
416
417XrdXrootdMonitor *XrdXrootdMonitor::Alloc(int force)
418{
420 int lastVal;
421
422// If enabled, create a new object (if possible). If we are not monitoring
423// i/o then return the global object.
424//
425// if (!isEnabled || (isEnabled < 0 && !force)) mp = 0;
426 if (!isEnabled) mp = 0;
427 else if (!monIO) mp = altMon;
428 else if ((mp = new XrdXrootdMonitor()))
429 if (!(mp->monBuff)) {delete mp; mp = 0;}
430
431// Check if we should turn on the monitor clock
432//
433 if (mp && isEnabled < 0)
434 {windowMutex.Lock();
435 lastVal = numMonitor; numMonitor++;
436 if (!lastVal && !monREDR) startClock();
437 windowMutex.UnLock();
438 }
439
440// All done
441//
442 return mp;
443}
444
445/******************************************************************************/
446/* C l o s e */
447/******************************************************************************/
448
449void XrdXrootdMonitor::Close(kXR_unt32 dictid, long long rTot, long long wTot)
450{
451 XrdXrootdMonitorLock mLock(this);
452 unsigned int rVal, wVal;
453
454// Fill out the monitor record (we allow the compiler to correctly cast data)
455//
456 if (lastWindow != currWindow) Mark();
457 else if (nextEnt == lastEnt) Flush();
458 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_CLOSE;
459 monBuff->info[nextEnt].arg0.id[1] = do_Shift(rTot, rVal);
460 monBuff->info[nextEnt].arg0.rTot[1] = htonl(rVal);
461 monBuff->info[nextEnt].arg0.id[2] = do_Shift(wTot, wVal);
462 monBuff->info[nextEnt].arg0.id[3] = 0;
463 monBuff->info[nextEnt].arg1.wTot = htonl(wVal);
464 monBuff->info[nextEnt++].arg2.dictid = dictid;
465
466// Check if we need to duplicate this entry
467//
468 if (altMon && this != altMon) altMon->Dup(&monBuff->info[nextEnt-1]);
469}
470
471/******************************************************************************/
472/* D e f a u l t s */
473/******************************************************************************/
474
475// This version must be called after the subsequent version!
476
477void XrdXrootdMonitor::Defaults(char *dest1, int mode1, char *dest2, int mode2)
478{
479 int mmode;
480
481// If there are no destination then only g-stream events may be enabled.
482// Otherwise, sort out the destinations relative to modes.
483//
484 if (!dest1 && !dest2) {isEnabled = 0; return;}
485 if (!dest1) {dest1 = dest2; dest2 = 0; mode1 |= mode2; mode2 = 0;}
486
487// Set the default destinations (caller supplied strdup'd strings)
488//
489 if (Dest1) free(Dest1);
490 Dest1 = dest1; monMode1 = mode1;
491 if (Dest2) free(Dest2);
492 Dest2 = dest2; monMode2 = mode2;
493
494// Set overall monitor mode
495//
496 mmode = mode1 | mode2;
497 monACTIVE = (mmode ? 1 : 0);
498 isEnabled = (mmode & XROOTD_MON_ALL ? 1 :-1);
499 monIO = (mmode & XROOTD_MON_IO ? 1 : 0);
500 monIO = (mmode & XROOTD_MON_IOV ? 2 : monIO);
501 monINFO = (mmode & XROOTD_MON_INFO ? 1 : 0);
502 monFILE = (mmode & XROOTD_MON_FILE ? 1 : 0) | monIO;
503 monREDR = (mmode & XROOTD_MON_REDR ? 1 : 0);
504 monUSER = (mmode & XROOTD_MON_USER ? 1 : 0);
505 monAUTH = (mmode & XROOTD_MON_AUTH ? 1 : 0);
506 monFSTAT = (mmode & XROOTD_MON_FSTA && monFSTAT ? 1 : 0);
507
508// Compute whether or not we need the clock running
509//
510 if (monREDR || (isEnabled > 0 && (monIO || monFILE))) monCLOCK = 1;
511
512// Check where user information should go
513//
514 if (((mode1 & XROOTD_MON_IO) && (mode1 & XROOTD_MON_USER))
515 || ((mode2 & XROOTD_MON_IO) && (mode2 & XROOTD_MON_USER)))
516 {if ((!(mode1 & XROOTD_MON_IO) && (mode1 & XROOTD_MON_USER))
517 || (!(mode2 & XROOTD_MON_IO) && (mode2 & XROOTD_MON_USER))) monUSER = 3;
518 else monUSER = 2;
519 }
520
521// If we are monitoring redirections then set an envar saying how often idents
522// should be sent (this also tips off other layers to handle such monitoring)
523//
524 if (monREDR) XrdOucEnv::Export("XRDMONRDR", monIdent);
525}
526
527/******************************************************************************/
528
529void XrdXrootdMonitor::Defaults(int msz, int rsz, int wsz,
530 int flush, int flash, int idt, int rnm,
531 int fbsz, int fsint, int fsopt, int fsion)
532{
533
534// Set default window size and flush time
535//
536 sizeWindow = (wsz <= 0 ? 60 : wsz);
537 autoFlush = (flush <= 0 ? 600 : flush);
538 autoFlash = (flash <= 0 ? 0 : flash);
539 monIdent = idt;
540 rdrNum = (rnm <= 0 || rnm > rdrMax ? 3 : rnm);
541 rdrWin = (sizeWindow > 16777215 ? 16777215 : sizeWindow);
542 rdrWin = htonl(rdrWin);
543
544// Set the fstat defaults
545//
546 XrdXrootdMonFile::Defaults(fsint, fsopt, fsion, fbsz);
547 monFSTAT = fsint != 0;
548
549// Set default monitor buffer size
550//
551 if (msz <= 0) msz = 16384;
552 else if (msz < 1024) msz = 1024;
553 else msz = msz/sizeof(XrdXrootdMonTrace)*sizeof(XrdXrootdMonTrace);
554 lastEnt = (msz-sizeof(XrdXrootdMonHeader))/sizeof(XrdXrootdMonTrace);
555 monBlen = (lastEnt*sizeof(XrdXrootdMonTrace))+sizeof(XrdXrootdMonHeader);
556 lastEnt--;
557
558// Set default monitor redirect buffer size
559//
560 if (rsz <= 0) rsz = 32768;
561 else if (rsz < 2048) rsz = 2048;
562 lastRnt = (rsz-(sizeof(XrdXrootdMonHeader) + 16))/sizeof(XrdXrootdMonRedir);
563 monRlen = (lastRnt*sizeof(XrdXrootdMonRedir))+sizeof(XrdXrootdMonHeader)+16;
564 lastRnt--;
565}
566
567/******************************************************************************/
568/* D i s c */
569/******************************************************************************/
570
571void XrdXrootdMonitor::Disc(kXR_unt32 dictid, int csec, char Flags)
572{
573 XrdXrootdMonitorLock mLock(this);
574
575// Check if this should not be included in the io trace
576//
577 if (this != altMon && monUSER == 1 && altMon)
578 {altMon->Disc(dictid, csec); return;}
579
580// Fill out the monitor record (let compiler cast the data correctly)
581//
582 if (lastWindow != currWindow) Mark();
583 else if (nextEnt == lastEnt) Flush();
584 monBuff->info[nextEnt].arg0.rTot[0] = 0;
585 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_DISC;
586 monBuff->info[nextEnt].arg0.id[1] = Flags;
587 monBuff->info[nextEnt].arg1.wTot = htonl(csec);
588 monBuff->info[nextEnt++].arg2.dictid = dictid;
589
590// Check if we need to duplicate this entry
591//
592 if (altMon && this != altMon && monUSER == 3)
593 altMon->Dup(&monBuff->info[nextEnt-1]);
594}
595
596/******************************************************************************/
597/* D u p */
598/******************************************************************************/
599
600void XrdXrootdMonitor::Dup(XrdXrootdMonTrace *mrec)
601{
602 XrdXrootdMonitorLock mLock(this);
603
604// Fill out the monitor record
605//
606 if (lastWindow != currWindow) Mark();
607 else if (nextEnt == lastEnt) Flush();
608 memcpy(&monBuff->info[nextEnt],(const void *)mrec,sizeof(XrdXrootdMonTrace));
609 nextEnt++;
610}
611
612/******************************************************************************/
613/* Private: F e t c h */
614/******************************************************************************/
615
616XrdXrootdMonitor::MonRdrBuff *XrdXrootdMonitor::Fetch()
617{
618 MonRdrBuff *bP;
619
620// Get the next available stream and promote another one
621//
622 rdrMutex.Lock();
623 if ((bP = rdrMP)) rdrMP = rdrMP->Next;
624 rdrMutex.UnLock();
625 return bP;
626}
627
628/******************************************************************************/
629/* I n i t */
630/******************************************************************************/
631
633 const char *iHost, const char *iProg,
634 const char *iName, int Port)
635{
636 const char *cgID0 = "&site=%s";
637 const char *cgID1 = "&host=%s";
638 const char *cgID2 = "&port=%d&inst=%s";
639 const char *cgID3 = "&pgm=%s&ver=%s";
640
641 const char *jsID0 = "\"src\":{\"site\":\"%s\"}";
642 const char *jsID1 = "%s\"host\":\"%s\"}";
643 const char *jsID2 = "%s\"port\":%d,\"inst\":\"%s\"}";
644 const char *jsID3 = "%s\"pgm\":\"%s\",\"ver\":\"%s\"}";
645
646 XrdXrootdMonMap *mP;
647 char iBuff[1024], iMuff[2048], iPuff[1024];
648 int n, i, j;
649
650// Set static variables
651//
652 Sched = sp;
653 eDest = errp;
654
655// Generate our server ID (the version is not part of he fingerprint)
656//
657 strcpy(iBuff, "=/");
658 kySID = XrdOucUtils::Ident(mySID, iBuff+2, sizeof(iBuff)-2,
659 iHost, iProg, iName, Port);
660 n = strlen(iBuff);
661 snprintf(iBuff+n, sizeof(iBuff)-n, "&ver=%s", XrdVERSION);
662
663 kySIDSZ = strlen(kySID);
664 monHost = strdup(iHost);
665
666// Ignore array bounds warning from gcc 12 triggered because the allocated
667// memory for the XrdXrootdMonMap is smaller than sizeof(XrdXrootdMonMap)
668#if defined(__GNUC__) && __GNUC__ >= 12
669#pragma GCC diagnostic push
670#pragma GCC diagnostic ignored "-Warray-bounds"
671#endif
672// Create identification record
673//
674 idLen = strlen(iBuff) + sizeof(XrdXrootdMonHeader) + sizeof(kXR_int32);
675 idRec = (char *)malloc(idLen+1);
676 mP = (XrdXrootdMonMap *)idRec;
677 fillHeader(&(mP->hdr), XROOTD_MON_MAPIDNT, idLen);
678 mP->hdr.pseq = 0;
679 mP->dictid = 0;
680 strcpy(mP->info, iBuff);
681#if defined(__GNUC__) && __GNUC__ >= 12
682#pragma GCC diagnostic pop
683#endif
684
685// Generate a CGI version of all the variations
686//
687 const char *Site (getenv("XRDSITE") ? getenv("XRDSITE") : "");
688 i = snprintf(iPuff, sizeof(iPuff), cgID0, Site);
689 SidCGI[0] = strdup(iPuff);
690 LidCGI[0] = strlen(iPuff);
691
692 n = sizeof(iPuff)-i; j = i;
693 i = snprintf(iPuff+j, n, cgID1, iHost);
694 SidCGI[1] = strdup(iPuff);
695 LidCGI[1] = strlen(iPuff);
696
697 n -= i; j += i;
698 i = snprintf(iPuff+j, n, cgID2, Port, iName);
699 SidCGI[2] = strdup(iPuff);
700 LidCGI[2] = strlen(iPuff);
701
702 n -= i; j += i;
703 snprintf(iPuff+j, n, cgID3, iProg, XrdVERSION);
704 SidCGI[3] = strdup(iPuff);
705 LidCGI[3] = strlen(iPuff);
706
707// Generate a JSON version of all the variations.
708//
709 n = snprintf(iPuff, sizeof(iPuff), jsID0, Site);
710 SidJSON[0] = strdup(iPuff);
711 LidJSON[0] = strlen(iPuff);
712
713 strcpy(iPuff+n-1, ",");
714 n = snprintf(iMuff, sizeof(iMuff), jsID1, iPuff, iHost);
715 SidJSON[1] = strdup(iMuff);
716 LidJSON[1] = strlen(iMuff);
717
718 strcpy(iMuff+n-1, ",");
719 n = snprintf(iPuff, sizeof(iPuff), jsID2, iMuff, Port, iName);
720 SidJSON[2] = strdup(iPuff);
721 LidJSON[2] = strlen(iPuff);
722
723 strcpy(iPuff+n-1, ",");
724 snprintf(iMuff, sizeof(iMuff), jsID3, iPuff, iProg, XrdVERSION);
725 SidJSON[3] = strdup(iMuff);
726 LidJSON[3] = strlen(iMuff);
727}
728
729/******************************************************************************/
730
732{
733 static XrdXrootdMonitor_Ident MonIdent(monIdent, isEnabled);
734 int i, Now = time(0);
735 bool aOK;
736
737// Setup the primary destination
738//
739 if (Dest1)
740 {InetDest1 = new XrdNetMsg(eDest, Dest1, &aOK);
741 if (!aOK)
742 {eDest->Emsg("Monitor", "Unable to setup primary monitor collector.");
743 return 0;
744 }
745 }
746
747// Setup the secondary destination
748//
749 if (Dest2)
750 {InetDest2 = new XrdNetMsg(eDest, Dest2, &aOK);
751 if (!aOK)
752 {eDest->Emsg("Monitor","Unable to setup secondary monitor collector.");
753 return 0;
754 }
755 }
756
757// Now schedule the first identification record
758//
759 if (Sched && monIdent >= 0) Sched->Schedule((XrdJob *)&MonIdent);
760
761// There is nothing more to do unless we have been enabled via Defaults()
762//
763 if (!isEnabled) return 1;
764
765// If there is a destination that is only collecting file events, then
766// allocate a global monitor object but don't start the timer just yet.
767//
768 if ((monMode1 && !(monMode1 & XROOTD_MON_IO))
769 || (monMode2 && !(monMode2 & XROOTD_MON_IO)))
770 if (!(altMon = new XrdXrootdMonitor()) || !altMon->monBuff)
771 {if (altMon) {delete altMon; altMon = 0;}
772 eDest->Emsg("Monitor","allocate monitor; insufficient storage.");
773 return 0;
774 }
775
776// Turn on the monitoring clock if we need it running all the time
777//
778 if (monCLOCK) startClock();
779
780// If we are monitoring file stats then start that up
781//
782 if (!Sched || !monFSTAT) monFSTAT = 0;
783 else if (!XrdXrootdMonFile::Init()) return 0;
784
785// If we are not monitoring redirections, we are done!
786//
787 if (!monREDR) return 1;
788
789// Allocate as many redirection monitors as requested
790//
791 for (i = 0; i < rdrNum; i++)
792 {if (posix_memalign((void **)&rdrMon[i].Buff, getpagesize(),monRlen))
793 {eDest->Emsg("Monitor", "Unable to allocate monitor rdr buffer.");
794 return 0;
795 }
796 rdrMon[i].Buff->sID = mySID;
797 rdrMon[i].Buff->sXX[0] = XROOTD_MON_REDSID;
798 rdrMon[i].Next = (i ? &rdrMon[i-1] : &rdrMon[0]);
799 rdrMon[i].nextEnt = 0;
800 rdrMon[i].flushIt = Now + autoFlush;
801 rdrMon[i].lastTOD = 0;
802 }
803 rdrMon[0].Next = &rdrMon[i-1];
804 rdrMP = &rdrMon[0];
805
806// All done
807//
808 return 1;
809}
810
811/******************************************************************************/
812/* G e t D i c t I D */
813/******************************************************************************/
814
816{
817 static XrdSysMutex seqMutex;
818 static unsigned int monSeqID = 1;
819 unsigned int mySeqID;
820
821// Assign a unique ID for this entry
822//
823 seqMutex.Lock();
824 mySeqID = monSeqID++;
826
827// Return the ID
828//
829 if (hbo) return mySeqID;
830 return htonl(mySeqID);
831}
832
833/******************************************************************************/
834/* Private: M a p */
835/******************************************************************************/
836
837kXR_unt32 XrdXrootdMonitor::Map(char code, XrdXrootdMonitor::User &uInfo,
838 const char *path)
839{
840 XrdXrootdMonMap map;
841 int size, montype;
842
843// Copy in the username and path
844//
845 map.dictid = GetDictID();
846 strcpy(map.info, uInfo.Name);
847 size = uInfo.Len;
848 if (path)
849 {*(map.info+size) = '\n';
850 strlcpy(map.info+size+1, path, sizeof(map.info)-size-1);
851 size = size + strlen(path) + 1;
852 }
853
854// Fill in the header
855//
856 size = sizeof(XrdXrootdMonHeader)+sizeof(kXR_int32)+size;
857 fillHeader(&map.hdr, code, size);
858
859// Route the packet to all destinations that need them
860//
861 if (code == XROOTD_MON_MAPPATH) montype = XROOTD_MON_PATH;
862 else if (code == XROOTD_MON_MAPUSER
863 || code == XROOTD_MON_MAPTOKN
864 || code == XROOTD_MON_MAPUEAC) montype = XROOTD_MON_USER;
865 else montype = XROOTD_MON_INFO;
866 Send(montype, (void *)&map, size);
867
868// Return the dictionary id
869//
870 return map.dictid;
871}
872
873/******************************************************************************/
874/* O p e n */
875/******************************************************************************/
876
877void XrdXrootdMonitor::Open(kXR_unt32 dictid, off_t fsize)
878{
879 XrdXrootdMonitorLock mLock(this);
880
881 if (lastWindow != currWindow) Mark();
882 else if (nextEnt == lastEnt) Flush();
883 h2nll(fsize, monBuff->info[nextEnt].arg0.val);
884 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_OPEN;
885 monBuff->info[nextEnt].arg1.buflen = 0;
886 monBuff->info[nextEnt++].arg2.dictid = dictid;
887
888// Check if we need to duplicate this entry
889//
890 if (altMon && this != altMon) altMon->Dup(&monBuff->info[nextEnt-1]);
891}
892
893/******************************************************************************/
894/* R e d i r e c t */
895/******************************************************************************/
896
897int XrdXrootdMonitor::Redirect(kXR_unt32 mID, const char *hName, int Port,
898 char opC, const char *Path)
899{
901 MonRdrBuff *mP = Fetch();
902 int n, slots, hLen, pLen;
903 char *dest;
904
905// Take care of the server's name which might actually be a path
906//
907 if (*hName == '/') {Path = hName; hName = ""; hLen = 0;}
908 else {const char *quest = index(hName, '?');
909 hLen = (quest ? quest - hName : strlen(hName));
910 if (hLen > 256) hLen = 256;
911 }
912
913// Take care of the path
914//
915 pLen = strlen(Path);
916 if (pLen > 1024) pLen = 1024;
917
918// Compute number of entries needed here
919//
920 n = (hLen + 1 + pLen + 1); // "<host>:<path>\0"
921 slots = n / sizeof(XrdXrootdMonRedir);
922 if (n % sizeof(XrdXrootdMonRedir)) slots++;
923 pLen = slots * sizeof(XrdXrootdMonRedir) - (hLen+1);
924
925// Obtain a lock on this buffer
926//
927 if (!mP) return 0;
928 mP->Mutex.Lock();
929
930// If we don't have enough slots, flush this buffer. Note that we account for
931// the ending timing mark here (an extra slot).
932//
933 if (mP->nextEnt + slots + 2 >= lastRnt) Flush(mP);
934
935// Check if we need a timing mark
936//
937 if (mP->lastTOD != rdrTOD)
938 {mP->lastTOD = rdrTOD;
939 setTMurk(mP->Buff, mP->nextEnt, mP->lastTOD);
940 mP->nextEnt++;
941 }
942
943// Fill out the buffer
944//
945 mtP = &(mP->Buff->info[mP->nextEnt]);
946 mtP->arg0.rdr.Type = XROOTD_MON_REDIRECT | opC;
947 mtP->arg0.rdr.Dent = static_cast<char>(slots);
948 mtP->arg0.rdr.Port = htons(static_cast<short>(Port));
949 mtP->arg1.dictid = mID;
950 dest = (char *)(mtP+1);
951 strncpy(dest, hName,hLen); dest += hLen; *dest++ = ':';
952 strncpy(dest, Path, pLen);
953
954// Adjust pointer and return
955//
956 mP->nextEnt = mP->nextEnt + (slots+1);
957 mP->Mutex.UnLock();
958 return 0;
959}
960
961
962/******************************************************************************/
963/* T i c k */
964/******************************************************************************/
965
967{
968 time_t Now = time(0);
969 int nextFlush;
970
971// We can safely set the window as we are the only ones doing so and memory
972// access is atomic as long as it sits within a cache line (which it does).
973//
974 currWindow = static_cast<kXR_int32>(Now);
975 rdrTOD = htonl(currWindow);
976 nextFlush = currWindow + autoFlush;
977
978// Check to see if we should flush the alternate monitor
979//
980 if (altMon && currWindow >= FlushTime)
982 if (currWindow >= FlushTime)
983 {if (altMon->nextEnt > 1) altMon->Flush();
984 else FlushTime = nextFlush;
985 }
987 }
988
989// Now check to see if we need to flush redirect buffers
990//
991 if (monREDR)
992 {int n = rdrNum;
993 while(n--)
994 {rdrMon[n].Mutex.Lock();
995 if (rdrMon[n].nextEnt == 0) rdrMon[n].flushIt = nextFlush;
996 else if (rdrMon[n].flushIt <= currWindow) Flush(&rdrMon[n]);
997 rdrMon[n].Mutex.UnLock();
998 }
999 }
1000
1001// All done. Stop the clock if there is no reason for it to be running. The
1002// clock always runs if we are monitoring redirects or all clients. Otherwise,
1003// the clock only runs if we have a one or more client-specific monitors.
1004//
1005 if (!monREDR && isEnabled < 0)
1006 {windowMutex.Lock();
1007 if (!numMonitor) Now = 0;
1008 windowMutex.UnLock();
1009 }
1010 return Now;
1011}
1012
1013/******************************************************************************/
1014/* u n A l l o c */
1015/******************************************************************************/
1016
1017void XrdXrootdMonitor::unAlloc(XrdXrootdMonitor *monp)
1018{
1019
1020// We must delete this object if we are de-allocating the local monitor.
1021//
1022 if (monp != altMon) delete monp;
1023
1024// Decrease number being monitored if in selective mode
1025//
1026 if (isEnabled < 0)
1027 {windowMutex.Lock();
1028 numMonitor--;
1029 windowMutex.UnLock();
1030 }
1031}
1032
1033/******************************************************************************/
1034/* P r i v a t e M e t h o d s */
1035/******************************************************************************/
1036/******************************************************************************/
1037/* d o _ S h i f t */
1038/******************************************************************************/
1039
1040unsigned char XrdXrootdMonitor::do_Shift(long long xTot, unsigned int &xVal)
1041{
1042 const long long smask = 0x7fffffff00000000LL;
1043 const long long xmask = 0x7fffffffffffffffLL;
1044 unsigned char xshift = 0;
1045
1046 xTot &= xmask;
1047 while(xTot & smask) {xTot = xTot >> 1LL; xshift++;}
1048 xVal = static_cast<unsigned int>(xTot);
1049
1050 return xshift;
1051}
1052
1053/******************************************************************************/
1054/* f i l l H e a d e r */
1055/******************************************************************************/
1056
1057void XrdXrootdMonitor::fillHeader(XrdXrootdMonHeader *hdr,
1058 const char id, int size)
1059{
1060
1061// Fill in the header
1062//
1063 hdr->code = static_cast<kXR_char>(id);
1064// hdr->pseq = static_cast<kXR_char>(myseq); // Filled in Send()
1065 hdr->plen = htons(static_cast<uint16_t>(size));
1066 hdr->stod = startTime;
1067}
1068
1069/******************************************************************************/
1070/* F l u s h */
1071/******************************************************************************/
1072
1073void XrdXrootdMonitor::Flush()
1074{
1075 int size;
1076 kXR_int32 localWindow, now;
1077
1078// Do not flush if the buffer is empty
1079//
1080 if (nextEnt <= 1) return;
1081
1082// Get the current window marker. No need for locks as simple memory accesses
1083// are sufficiently synchrnozed for our purposes.
1084//
1085 localWindow = currWindow;
1086
1087// Fill in the header and in the process we will have the current time
1088//
1089 size = (nextEnt+1)*sizeof(XrdXrootdMonTrace)+sizeof(XrdXrootdMonHeader);
1090 fillHeader(&monBuff->hdr, XROOTD_MON_MAPTRCE, size);
1091
1092// Punt on the right ending time. We are trying to keep same-sized windows
1093// This was corrected by Matevz Tadel, as before we were using real time which
1094// could have been far into the future due to simple inactivity. So, Place the
1095// computed ending timing mark.
1096//
1097 now = lastWindow + sizeWindow;
1098 setTMark(monBuff, nextEnt, now);
1099
1100// Send off the buffer and reinitialize it
1101//
1102 if (this != altMon) Send(XROOTD_MON_IO, (void *)monBuff, size);
1103 else {Send(XROOTD_MON_FILE, (void *)monBuff, size);
1104 FlushTime = localWindow + autoFlush;
1105 }
1106 setTMark(monBuff, 0, localWindow);
1107 nextEnt = 1;
1108}
1109
1110/******************************************************************************/
1111
1112void XrdXrootdMonitor::Flush(XrdXrootdMonitor::MonRdrBuff *mP)
1113{
1114 int size;
1115
1116// Reset flush time but do not flush an empty buffer. We use the current time
1117// to make sure a record atleast sits in the buffer a full flush period.
1118//
1119 mP->flushIt = static_cast<int>(time(0)) + autoFlush;
1120 if (mP->nextEnt <= 1) return;
1121
1122// Set ending timing mark and force a new one on the next fill
1123//
1124 setTMurk(mP->Buff, mP->nextEnt, rdrTOD);
1125 mP->lastTOD = 0;
1126
1127// Fill in the header and in the process we will have the current time
1128//
1129 size = (mP->nextEnt+1)*sizeof(XrdXrootdMonRedir)+sizeof(XrdXrootdMonHeader)+8;
1130 fillHeader(&(mP->Buff->hdr), XROOTD_MON_MAPREDR, size);
1131
1132// Send off the buffer and reinitialize it
1133//
1134 Send(XROOTD_MON_REDR, (void *)(mP->Buff), size);
1135 mP->nextEnt = 0;
1136}
1137
1138/******************************************************************************/
1139/* M a r k */
1140/******************************************************************************/
1141
1142void XrdXrootdMonitor::Mark()
1143{
1144 kXR_int32 localWindow;
1145
1146// Get the current window marker. Since simple memory accesses are sufficiently
1147// synchronized, no need to lock this.
1148//
1149 localWindow = currWindow;
1150
1151// Using an update provided by Matevz Tadel, UCSD, if this is an I/O buffer
1152// mark then we will also flush the I/O buffer if all the following hold:
1153// a) flushing enabled, b) buffer not empty, and c) covers the flush time.
1154// We would normally do this during Tick() but that would require too much
1155// locking in the middle of an I/O path, so we do psudo timed flushing.
1156//
1157 if (this != altMon && autoFlash && nextEnt > 1)
1158 {kXR_int32 bufStartWindow =
1159 static_cast<kXR_int32>(ntohl(monBuff->info[0].arg2.Window));
1160 if (localWindow - bufStartWindow >= autoFlash)
1161 {Flush();
1162 lastWindow = localWindow;
1163 return;
1164 }
1165 }
1166
1167// Now, optimize placing the window mark in the buffer. Using another MT fix we
1168// set the end of the previous window to be lastwindow + sizeWindow (instead of
1169// localWindow) to prevent windows from being wrongly zero sized.
1170//
1171 if (monBuff->info[nextEnt-1].arg0.id[0] == XROOTD_MON_WINDOW)
1172 {
1173 monBuff->info[nextEnt-1].arg2.Window =
1174 static_cast<kXR_int32>(htonl(localWindow));
1175 }
1176 else if (nextEnt+8 > lastEnt)
1177 {
1178 Flush();
1179 }
1180 else
1181 {
1182 monBuff->info[nextEnt].arg0.val = mySID;
1183 monBuff->info[nextEnt].arg0.id[0] = XROOTD_MON_WINDOW;
1184 monBuff->info[nextEnt].arg1.Window =
1185 static_cast<kXR_int32>(htonl(lastWindow + sizeWindow));
1186 monBuff->info[nextEnt].arg2.Window =
1187 static_cast<kXR_int32>(htonl(localWindow));
1188 nextEnt++;
1189 }
1190 lastWindow = localWindow;
1191}
1192
1193/******************************************************************************/
1194/* S e n d */
1195/******************************************************************************/
1196
1197int XrdXrootdMonitor::Send(int monMode, void *buff, int blen, bool setseq)
1198{
1199#ifndef NODEBUG
1200 const char *TraceID = "Monitor";
1201#endif
1202 static XrdSysMutex sendMutex;
1203 static int seq1=0, seq2=0;
1204 XrdXrootdMonHeader *mHdr=0;
1205 int rc1, rc2;
1206
1207// If we are to set sequence numbers, recast the buffer. We are assured that
1208// the buffer always starts with the standard monitor header.
1209//
1210 if (setseq) mHdr = static_cast<XrdXrootdMonHeader*>(buff);
1211
1212 sendMutex.Lock();
1213 if (monMode & monMode1 && InetDest1)
1214 {if (mHdr) mHdr->pseq = (seq1++) & 0xff;
1215 rc1 = InetDest1->Send((char *)buff, blen);
1216 TRACE(DEBUG,blen <<" bytes sent to " <<Dest1 <<" rc=" <<rc1);
1217 }
1218 else rc1 = 0;
1219 if (monMode & monMode2 && InetDest2)
1220 {if (mHdr) mHdr->pseq = (seq2++) & 0xff;
1221 rc2 = InetDest2->Send((char *)buff, blen);
1222 TRACE(DEBUG,blen <<" bytes sent to " <<Dest2 <<" rc=" <<rc2);
1223 }
1224 else rc2 = 0;
1225 sendMutex.UnLock();
1226
1227 return (rc1 ? rc1 : rc2);
1228}
1229
1230/******************************************************************************/
1231/* s t a r t C l o c k */
1232/******************************************************************************/
1233
1234void XrdXrootdMonitor::startClock()
1235{
1236 static XrdXrootdMonitor_Tick MonTick;
1237 time_t Now;
1238
1239// Start the clock, Caller must have windowMutex locked, if necessary.
1240//
1241 Now = time(0);
1242 currWindow = static_cast<kXR_int32>(Now);
1243 rdrTOD = htonl(currWindow);
1244 MonTick.Set(Sched, sizeWindow);
1245 FlushTime = autoFlush + currWindow;
1246 if (Sched) Sched->Schedule((XrdJob *)&MonTick, Now+sizeWindow);
1247}
unsigned int kXR_unt32
Definition XPtypes.hh:90
unsigned char kXR_char
Definition XPtypes.hh:65
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define DEBUG(x)
static XrdSysError eDest(0,"crypto_")
XrdOucString Path
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACE(act, x)
Definition XrdTrace.hh:63
const kXR_char XROOTD_MON_DISC
const kXR_char XROOTD_MON_MAPUEAC
const kXR_char XROOTD_MON_MAPUSER
const kXR_char XROOTD_MON_APPID
const kXR_char XROOTD_MON_REDSID
union XrdXrootdMonRedir::@174 arg1
const kXR_char XROOTD_MON_MAPIDNT
const kXR_char XROOTD_MON_MAPTRCE
char info[1024+256]
const kXR_char XROOTD_MON_CLOSE
const kXR_char XROOTD_MON_MAPPATH
const kXR_char XROOTD_MON_OPEN
XrdXrootdMonHeader hdr
const kXR_char XROOTD_MON_REDIRECT
const kXR_char XROOTD_MON_MAPTOKN
union XrdXrootdMonRedir::@172 arg0
const kXR_char XROOTD_MON_MAPREDR
#define setTMurk(TM_mb, TM_en, TM_tm)
#define setTMark(TM_mb, TM_en, TM_tm)
#define XROOTD_MON_IOV
#define XROOTD_MON_INFO
#define XROOTD_MON_ALL
#define XROOTD_MON_AUTH
#define XROOTD_MON_IO
#define XROOTD_MON_USER
#define XROOTD_MON_FSTA
#define XROOTD_MON_PATH
#define XROOTD_MON_FILE
#define XROOTD_MON_REDR
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition XrdNetMsg.cc:70
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
static char * Ident(long long &mySID, char *iBuff, int iBlen, const char *iHost, const char *iProg, const char *iName, int Port)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Defaults(int intv, int opts, int iocnt, int fbsz)
Hello(const char *dest, char mode)
void Register(const char *Uname, const char *Hname, const char *Pname, unsigned int xSID=0)
void Report(const char *Info)
void Set(XrdScheduler *sp, int intvl)
static const int rdrMax
static XrdXrootdMonitor * altMon
static void Defaults(char *dest1, int m1, char *dest2, int m2)
void appID(char *id)
static time_t Tick()
void Disc(kXR_unt32 dictid, int csec, char Flags=0)
void Close(kXR_unt32 dictid, long long rTot, long long wTot)
static int Send(int mmode, void *buff, int size, bool setseq=true)
void Open(kXR_unt32 dictid, off_t fsize)
static kXR_unt32 GetDictID(bool hbo=false)
XrdScheduler * Sched

◆ setTMurk

#define setTMurk (   TM_mb,
  TM_en,
  TM_tm 
)
Value:
TM_mb->info[TM_en].arg0.Window = rdrWin; \
TM_mb->info[TM_en].arg1.Window = static_cast<kXR_int32>(TM_tm);

Definition at line 136 of file XrdXrootdMonitor.cc.

Variable Documentation

◆ XrdXrootdTrace

XrdSysTrace XrdXrootdTrace
extern

Definition at line 96 of file XrdFrmAdminMain.cc.