Skip to content

Commit b9bedf8

Browse files
committed
Transform load calls to use std::future
1 parent 644b233 commit b9bedf8

6 files changed

Lines changed: 107 additions & 32 deletions

File tree

include/openPMD/LoadStoreChunk.hpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "openPMD/auxiliary/ShareRawInternal.hpp"
55
#include "openPMD/auxiliary/UniquePtr.hpp"
66

7+
#include <future>
78
#include <optional>
89
#include <stdexcept>
910
#include <type_traits>
@@ -48,6 +49,18 @@ namespace internal
4849
};
4950
} // namespace internal
5051

52+
namespace auxiliary::detail
53+
{
54+
using future_to_shared_ptr_dataset_types =
55+
map_variant<as_shared_pointer, dataset_types>::type;
56+
} // namespace auxiliary::detail
57+
58+
enum class EnqueuePolicy
59+
{
60+
Defer,
61+
Immediate
62+
};
63+
5164
/** Basic configuration for a Load/Store operation.
5265
*
5366
* @tparam ChildClass CRT pattern.
@@ -136,11 +149,13 @@ class ConfigureLoadStore : protected internal::ConfigureLoadStoreData
136149
[[nodiscard]] auto enqueueStore(F &&createBuffer) -> DynamicMemoryView<T>;
137150

138151
template <typename T>
139-
[[nodiscard]] auto enqueueLoad() -> std::shared_ptr<T>;
152+
[[nodiscard]] auto enqueueLoad() -> std::future<std::shared_ptr<T>>;
153+
154+
template <typename T>
155+
[[nodiscard]] auto load(EnqueuePolicy) -> std::shared_ptr<T>;
140156

141-
using shared_ptr_dataset_types = auxiliary::detail::
142-
map_variant<auxiliary::detail::as_shared_pointer, dataset_types>::type;
143-
[[nodiscard]] auto enqueueLoadVariant() -> shared_ptr_dataset_types;
157+
[[nodiscard]] auto enqueueLoadVariant()
158+
-> std::future<auxiliary::detail::future_to_shared_ptr_dataset_types>;
144159
};
145160

146161
/** Configuration for a Store operation with a buffer type.
@@ -230,6 +245,8 @@ class ConfigureLoadStoreFromBuffer
230245

231246
public:
232247
auto enqueueLoad() -> void;
248+
249+
auto load(EnqueuePolicy) -> void;
233250
};
234251
} // namespace openPMD
235252

include/openPMD/RecordComponent.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ class RecordComponent : public BaseRecordComponent
139139
friend class ConfigureStoreChunkFromBuffer;
140140
template <typename>
141141
friend class ConfigureLoadStoreFromBuffer;
142+
friend struct VisitorEnqueueLoadVariant;
142143

143144
public:
144145
enum class Allocation

include/openPMD/RecordComponent.tpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ inline std::shared_ptr<T> RecordComponent::loadChunk(Offset o, Extent e)
7676
operation.extent(std::move(e));
7777
}
7878

79-
return operation.enqueueLoad<T>();
79+
return operation.load<T>(EnqueuePolicy::Defer);
8080
}
8181

8282
template <typename T>
@@ -100,15 +100,15 @@ RecordComponent::loadChunkAllocate_impl(internal::LoadStoreConfig cfg)
100100
.offset(std::move(o))
101101
.extent(std::move(e))
102102
.withSharedPtr(newData)
103-
.enqueueLoad();
103+
.load(EnqueuePolicy::Defer);
104104
return newData;
105105
#else
106106
auto newData = std::shared_ptr<T[]>(new T[numPoints]);
107107
prepareLoadStore()
108108
.offset(std::move(o))
109109
.extent(std::move(e))
110110
.withSharedPtr(newData)
111-
.enqueueLoad();
111+
.load(EnqueuePolicy::Defer);
112112
return std::static_pointer_cast<T>(std::move(newData));
113113
#endif
114114
}
@@ -134,7 +134,7 @@ inline void RecordComponent::loadChunk(
134134
operation.extent(std::move(e));
135135
}
136136

137-
operation.withSharedPtr(std::move(data)).enqueueLoad();
137+
operation.withSharedPtr(std::move(data)).load(EnqueuePolicy::Defer);
138138
}
139139

140140
template <typename T_with_extent>
@@ -215,7 +215,7 @@ inline void RecordComponent::loadChunkRaw(T *ptr, Offset offset, Extent extent)
215215
.offset(std::move(offset))
216216
.extent(std::move(extent))
217217
.withRawPtr(ptr)
218-
.enqueueLoad();
218+
.load(EnqueuePolicy::Defer);
219219
}
220220

221221
template <typename T>

src/LoadStoreChunk.cpp

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// comment to keep clang-format from reordering
1313
#include "openPMD/DatatypeMacros.hpp"
1414

15+
#include <future>
1516
#include <memory>
1617
#include <optional>
1718
#include <stdexcept>
@@ -147,32 +148,65 @@ auto ConfigureLoadStore<ChildClass>::enqueueStore() -> DynamicMemoryView<T>
147148

148149
template <typename ChildClass>
149150
template <typename T>
150-
auto ConfigureLoadStore<ChildClass>::enqueueLoad() -> std::shared_ptr<T>
151+
auto ConfigureLoadStore<ChildClass>::enqueueLoad()
152+
-> std::future<std::shared_ptr<T>>
151153
{
152-
return m_rc.loadChunkAllocate_impl<T>(storeChunkConfig());
154+
auto res = m_rc.loadChunkAllocate_impl<T>(storeChunkConfig());
155+
return std::async(
156+
std::launch::deferred,
157+
[res_lambda = std::move(res), rc = m_rc]() mutable {
158+
rc.seriesFlush();
159+
return res_lambda;
160+
});
153161
}
154162

155-
namespace
163+
template <typename ChildClass>
164+
template <typename T>
165+
auto ConfigureLoadStore<ChildClass>::load(EnqueuePolicy ep)
166+
-> std::shared_ptr<T>
156167
{
157-
template <typename ConfigureLoadStore_t>
158-
struct VisitorEnqueueLoadVariant
168+
auto res = m_rc.loadChunkAllocate_impl<T>(storeChunkConfig());
169+
switch (ep)
159170
{
160-
template <typename T>
161-
static auto call(RecordComponent const &, ConfigureLoadStore_t &cfg) ->
162-
typename ConfigureLoadStore_t::shared_ptr_dataset_types
163-
{
164-
return cfg.template enqueueLoad<T>();
165-
}
166-
};
167-
} // namespace
171+
case EnqueuePolicy::Defer:
172+
break;
173+
case EnqueuePolicy::Immediate:
174+
m_rc.seriesFlush();
175+
break;
176+
}
177+
return res;
178+
}
179+
180+
struct VisitorEnqueueLoadVariant
181+
{
182+
template <typename T>
183+
static auto call(RecordComponent &rc, internal::LoadStoreConfig cfg)
184+
-> std::future<auxiliary::detail::future_to_shared_ptr_dataset_types>
185+
{
186+
auto res = rc.loadChunkAllocate_impl<T>(std::move(cfg));
187+
return std::async(
188+
std::launch::deferred,
189+
[res_lambda = std::move(res), rc_lambda = rc]() mutable
190+
-> auxiliary::detail::future_to_shared_ptr_dataset_types {
191+
rc_lambda.seriesFlush();
192+
return res_lambda;
193+
});
194+
}
195+
196+
static auto non_templated_implementation(
197+
RecordComponent &rc, internal::LoadStoreConfig cfg)
198+
-> std::future<auxiliary::detail::future_to_shared_ptr_dataset_types>
199+
{
200+
return rc.visit<VisitorEnqueueLoadVariant>(std::move(cfg));
201+
}
202+
};
168203

169204
template <typename ChildClass>
170205
auto ConfigureLoadStore<ChildClass>::enqueueLoadVariant()
171-
-> shared_ptr_dataset_types
206+
-> std::future<auxiliary::detail::future_to_shared_ptr_dataset_types>
172207
{
173-
return m_rc
174-
.visit<VisitorEnqueueLoadVariant<ConfigureLoadStore<ChildClass>>>(
175-
*this);
208+
return VisitorEnqueueLoadVariant::non_templated_implementation(
209+
m_rc, this->storeChunkConfig());
176210
}
177211

178212
template <typename Ptr_Type, typename ChildClass>
@@ -245,9 +279,31 @@ auto ConfigureLoadStoreFromBuffer<Ptr_Type>::enqueueLoad() -> void
245279
std::move(this->m_buffer), this->storeChunkConfig());
246280
}
247281

282+
template <typename Ptr_Type>
283+
auto ConfigureLoadStoreFromBuffer<Ptr_Type>::load(EnqueuePolicy ep) -> void
284+
{
285+
this->m_rc.loadChunk_impl(
286+
std::move(this->m_buffer), this->storeChunkConfig());
287+
switch (ep)
288+
{
289+
290+
case EnqueuePolicy::Defer:
291+
break;
292+
case EnqueuePolicy::Immediate:
293+
this->m_rc.seriesFlush();
294+
break;
295+
}
296+
}
297+
298+
/* clang-format would destroy the NOLINT comments */
299+
// clang-format off
248300
#define INSTANTIATE_METHOD_TEMPLATES(base_class, dtype) \
249-
template auto base_class::enqueueStore() -> DynamicMemoryView<dtype>; \
250-
template auto base_class::enqueueLoad() -> std::shared_ptr<dtype>;
301+
template auto base_class::enqueueStore()->DynamicMemoryView<dtype>; \
302+
template auto base_class::enqueueLoad() \
303+
/* NOLINTNEXTLINE(bugprone-macro-parentheses) */ \
304+
->std::future<std::shared_ptr<dtype>>; \
305+
template auto base_class::load(EnqueuePolicy)->std::shared_ptr<dtype>;
306+
// clang-format on
251307

252308
#define INSTANTIATE_METHOD_TEMPLATES_FOR_BASE(dtype) \
253309
INSTANTIATE_METHOD_TEMPLATES(ConfigureLoadStore<void>, dtype)

test/ParallelIOTest.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -495,13 +495,14 @@ void available_chunks_test(std::string const &file_ending)
495495
auto E_y = it0.meshes["E"]["y"];
496496
auto width = E_y.getExtent()[1];
497497
auto first_row =
498-
E_y.prepareLoadStore().extent({1, width}).enqueueLoad<int>();
498+
E_y.prepareLoadStore().extent({1, width}).enqueueLoad<int>().get();
499499
auto middle_rows = E_y.prepareLoadStore()
500500
.offset({1, 0})
501501
.extent({3, width})
502-
.enqueueLoad<int>();
502+
.enqueueLoad<int>()
503+
.get();
503504
auto last_row =
504-
E_y.prepareLoadStore().offset({4, 0}).enqueueLoad<int>();
505+
E_y.prepareLoadStore().offset({4, 0}).enqueueLoad<int>().get();
505506
read.flush();
506507

507508
for (auto row : [&]() -> std::vector<std::shared_ptr<int> *> {

test/SerialIOTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1680,7 +1680,7 @@ inline void write_test(const std::string &backend)
16801680
auto opaqueTypeDataset = rc.visit<ReadFromAnyType>();
16811681

16821682
auto variantTypeDataset = rc.loadChunkVariant();
1683-
auto variantTypeDataset2 = rc.prepareLoadStore().enqueueLoadVariant();
1683+
auto variantTypeDataset2 = rc.prepareLoadStore().enqueueLoadVariant().get();
16841684
rc.seriesFlush();
16851685
for (auto ptr : {&variantTypeDataset, &variantTypeDataset2})
16861686
{

0 commit comments

Comments
 (0)