Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
dcdb
dcdb
Commits
06805839
Commit
06805839
authored
Jun 07, 2019
by
Micha Mueller
Browse files
Dcdbpusher: add sources for unfinished Caliper service
parent
79201760
Changes
3
Hide whitespace changes
Inline
Side-by-side
dcdbpusher/Caliper/dcdbpusher/CMakeLists.txt
0 → 100644
View file @
06805839
set
(
CALIPER_DCDBPUSHER_SOURCES
DcdbPusher.cpp
)
add_service_sources
(
${
CALIPER_DCDBPUSHER_SOURCES
}
)
add_caliper_service
(
"dcdbpusher"
)
dcdbpusher/Caliper/dcdbpusher/DcdbPusher.cpp
0 → 100644
View file @
06805839
// This program is to be used with Caliper.
// Caliper belongs to Lawrence Livermore National Security, LLC.
// LLNL-CODE-678900
// All rights reserved.
//
// For details, see https://github.com/scalability-llnl/Caliper.
//================================================================================
// Name : DcdbPusher.cpp
// Author : Micha Mueller
// Copyright : Leibniz Supercomputing Centre
// Description : Caliper service to forward snapshot data to dcdb pusher.
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
/**
* @file DcdbPusher.cpp
*
* @brief Caliper service to forward snapshot data to dcdb pusher.
*
* @details Service for the runtime processing level. Relies on the sampler,
* symbollookup and timestamp service. To avoid a detour to the flush
* level the symbollookup service is invoked manually when processing a
* snapshot. This way all required information is available without
* having to explicitly trigger a flush and the relevant data can be
* instantly forwarded. Unneeded atttributes are stripped off and
* discarded.
*
*/
#include "caliper/CaliperService.h"
#include "caliper/Caliper.h"
#include "caliper/SnapshotRecord.h"
#include "caliper/common/Log.h"
#include "caliper/common/RuntimeConfig.h"
using
namespace
cali
;
namespace
{
class
DcdbPusher
{
private:
static
const
ConfigSet
::
Entry
s_configdata
[];
//TODO reduce snapshot frequency during runtime
unsigned
snapshot_threshold
=
10
;
unsigned
threshold_exceeded
=
0
;
unsigned
snapshots_processed
=
0
;
unsigned
snapshots_failed
=
0
;
bool
initialized
=
false
;
Attribute
sampler
{
Attribute
::
invalid
};
Attribute
symbol_fun
{
Attribute
::
invalid
};
Attribute
timestamp
{
Attribute
::
invalid
};
void
post_init_cb
(
Caliper
*
c
,
Channel
*
chn
)
{
// manually issue a pre-flush event to set up the SymbolLookup service
chn
->
events
().
pre_flush_evt
(
c
,
chn
,
nullptr
);
// Check if required services sampler, symbollookup and timestamp are
// active by searching for identifying attributes.
//TODO support data collection if event triggered snapshots are used
sampler
=
c
->
get_attribute
(
"cali.sampler.pc"
);
symbol_fun
=
c
->
get_attribute
(
"source.function#cali.sampler.pc"
);
timestamp
=
c
->
get_attribute
(
"time.timestamp"
);
if
(
sampler
==
Attribute
::
invalid
||
symbol_fun
==
Attribute
::
invalid
||
timestamp
==
Attribute
::
invalid
)
{
Log
(
1
).
stream
()
<<
chn
->
name
()
<<
": DcdbPusher: not all required services "
"sampler, symbollookup and timestamp are running."
<<
std
::
endl
;
return
;
}
initialized
=
true
;
}
void
process_snapshot_cb
(
Caliper
*
c
,
Channel
*
chn
,
const
SnapshotRecord
*
,
const
SnapshotRecord
*
sbuf
)
{
++
snapshots_processed
;
//TODO check if signal safety is required
//if (c->is_signal()) {
// ++snapshots_failed;
// return;
//}
if
(
!
initialized
)
{
++
snapshots_failed
;
return
;
}
SnapshotRecord
::
Sizes
sizes
=
sbuf
->
size
();
if
((
sizes
.
n_nodes
+
sizes
.
n_immediate
)
==
0
)
{
++
snapshots_failed
;
return
;
}
std
::
vector
<
Entry
>
rec
=
sbuf
->
to_entrylist
();
// manually issue a postprocess-snapshot event to enrich snapshot with
// symbollookup data
chn
->
events
().
postprocess_snapshot
(
c
,
chn
,
rec
);
Log
(
1
).
stream
()
<<
chn
->
name
()
<<
": DcdbPusher: printing snapshot"
<<
std
::
endl
;
for
(
const
Entry
&
e
:
rec
)
{
//print attribute values for timestamp and looked up function symbol name
cali_id_t
entryId
=
c
->
get_attribute
(
e
.
attribute
()).
id
();
if
(
entryId
==
symbol_fun
.
id
()
||
entryId
==
timestamp
.
id
())
{
Log
(
1
).
stream
()
<<
chn
->
name
()
<<
": DcdbPusher: "
<<
c
->
get_attribute
(
e
.
attribute
()).
name
()
<<
" = "
<<
e
.
value
().
to_string
()
<<
std
::
endl
;
}
}
Log
(
1
).
stream
()
<<
chn
->
name
()
<<
std
::
endl
;
}
void
finish_cb
(
Caliper
*
c
,
Channel
*
chn
)
{
if
(
threshold_exceeded
>
0
)
{
Log
(
1
).
stream
()
<<
chn
->
name
()
<<
": DcdbPusher: threshold was exceeded "
<<
threshold_exceeded
<<
" times."
<<
std
::
endl
;
}
Log
(
1
).
stream
()
<<
chn
->
name
()
<<
": DcdbPusher: "
<<
snapshots_processed
<<
" snapshots processed of which "
<<
snapshots_failed
<<
" failed."
<<
std
::
endl
;
}
DcdbPusher
(
Caliper
*
c
,
Channel
*
chn
)
{
ConfigSet
config
=
chn
->
config
().
init
(
"dcdbpusher"
,
s_configdata
);
snapshot_threshold
=
config
.
get
(
"snapshot_threshold"
).
to_uint
();
}
public:
~
DcdbPusher
()
{
}
static
void
dcdbpusher_register
(
Caliper
*
c
,
Channel
*
chn
)
{
DcdbPusher
*
instance
=
new
DcdbPusher
(
c
,
chn
);
chn
->
events
().
post_init_evt
.
connect
(
[
instance
](
Caliper
*
c
,
Channel
*
chn
){
instance
->
post_init_cb
(
c
,
chn
);
});
chn
->
events
().
process_snapshot
.
connect
(
[
instance
](
Caliper
*
c
,
Channel
*
chn
,
const
SnapshotRecord
*
trigger
,
const
SnapshotRecord
*
snapshot
){
instance
->
process_snapshot_cb
(
c
,
chn
,
trigger
,
snapshot
);
});
chn
->
events
().
finish_evt
.
connect
(
[
instance
](
Caliper
*
c
,
Channel
*
chn
){
instance
->
finish_cb
(
c
,
chn
);
delete
instance
;
});
Log
(
1
).
stream
()
<<
chn
->
name
()
<<
": Registered dcdbpusher service"
<<
std
::
endl
;
}
};
// class DcdbPusher
const
ConfigSet
::
Entry
DcdbPusher
::
s_configdata
[]
=
{
{
"snapshot_threshold"
,
CALI_TYPE_UINT
,
"10"
,
"Maximum number of snapshots per second we are allowed to process"
,
"Maximum number of snapshots per second we are allowed to process.
\n
"
"If the threshold is exceeded snapshots get discarded until the
\n
"
"limit is met again."
},
ConfigSet
::
Terminator
};
}
// namespace
namespace
cali
{
CaliperService
dcdbpusher_service
{
"dcdbpusher"
,
::
DcdbPusher
::
dcdbpusher_register
};
}
dcdbpusher/Caliper/patch
0 → 100644
View file @
06805839
--- a/src/services/CMakeLists.txt 2019-06-07 10:33:20.036516054 +0200
+++ b/src/services/CMakeLists.txt 2019-06-06 17:57:54.096623744 +0200
@@ -55,6 +55,7 @@
add_subdirectory(cupti)
add_subdirectory(cuptitrace)
endif()
+add_subdirectory(dcdbpusher)
add_subdirectory(debug)
add_subdirectory(env)
add_subdirectory(event)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment