Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF
# option(LABEL_TYPE "The label type of the graph, valid values: uint16, uint8" "uint8") # The type of the label in the graph
SET(LABEL_TYPE "uint8_t" CACHE STRING "The label type of the graph, valid values: uint16_t, uint8_t")

#print options
message(STATUS "Build test: ${BUILD_TEST}")
Expand Down Expand Up @@ -58,6 +60,12 @@ if(USE_PTHASH)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash)
endif()

# check label_type in {uint8, uint16}
if(NOT LABEL_TYPE STREQUAL "uint8_t" AND NOT LABEL_TYPE STREQUAL "uint16_t")
message(FATAL_ERROR "LABEL_TYPE must be uint8 or uint16, but got ${LABEL_TYPE}")
endif()
add_compile_definitions(FLEX_LABEL_TYPE=${LABEL_TYPE})

set(DEFAULT_BUILD_TYPE "Release")
if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}' as none was specified.")
Expand Down
161 changes: 79 additions & 82 deletions flex/engines/graph_db/database/update_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ UpdateTransaction::UpdateTransaction(const GraphDBSession& session,
extra_vertex_properties_[i].resize(4096);
}

size_t csr_num = 2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_;
size_t csr_num = 2 * schema().get_edge_triplet_num();
added_edges_.resize(csr_num);
updated_edge_data_.resize(csr_num);
}
Expand Down Expand Up @@ -560,7 +560,6 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
updated_edge_data;

size_t vertex_label_num = graph.schema().vertex_label_num();
size_t edge_label_num = graph.schema().edge_label_num();

for (label_t idx = 0; idx < vertex_label_num; ++idx) {
if (graph.lf_indexers_[idx].get_type() == PropertyType::kInt64) {
Expand Down Expand Up @@ -603,7 +602,7 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
extra_vertex_properties[i].resize(4096);
}

size_t csr_num = 2 * vertex_label_num * vertex_label_num * edge_label_num;
size_t csr_num = 2 * graph.schema().get_edge_triplet_num();
added_edges.resize(csr_num);
updated_edge_data.resize(csr_num);

Expand Down Expand Up @@ -681,16 +680,14 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,

size_t UpdateTransaction::get_in_csr_index(label_t src_label, label_t dst_label,
label_t edge_label) const {
return src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label;
return graph_.schema().get_edge_triplet_id(src_label, dst_label, edge_label);
}

size_t UpdateTransaction::get_out_csr_index(label_t src_label,
label_t dst_label,
label_t edge_label) const {
return src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label +
vertex_label_num_ * vertex_label_num_ * edge_label_num_;
return graph_.schema().get_edge_triplet_id(dst_label, src_label, edge_label) +
graph_.schema().get_edge_triplet_num();
}

bool UpdateTransaction::oid_to_lid(label_t label, const Any& oid,
Expand Down Expand Up @@ -820,89 +817,89 @@ void UpdateTransaction::applyVerticesUpdates() {
}

void UpdateTransaction::applyEdgesUpdates() {
for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
size_t oe_csr_index =
get_out_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[oe_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
++index) {
auto edge_triplet = graph_.schema().get_edge_triplet(index);
label_t src_label = std::get<0>(edge_triplet);
label_t dst_label = std::get<1>(edge_triplet);
label_t edge_label = std::get<2>(edge_triplet);
size_t oe_csr_index = get_out_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[oe_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}

std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
}
}

for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;
for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;

if (add_list.empty()) {
continue;
}
std::sort(add_list.begin(), add_list.end());
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
for (size_t idx = 0; idx < add_list.size(); ++idx) {
if (idx && add_list[idx] == add_list[idx - 1])
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label,
timestamp_, oarc, alloc_);
}
}
if (add_list.empty()) {
continue;
}
std::sort(add_list.begin(), add_list.end());
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
for (size_t idx = 0; idx < add_list.size(); ++idx) {
if (idx && add_list[idx] == add_list[idx - 1])
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label, timestamp_,
oarc, alloc_);
}
}
}

for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
size_t ie_csr_index =
get_in_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[ie_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
++index) {
auto edge_triplet = graph_.schema().get_edge_triplet(index);
label_t src_label = std::get<0>(edge_triplet);
label_t dst_label = std::get<1>(edge_triplet);
label_t edge_label = std::get<2>(edge_triplet);
size_t ie_csr_index = get_in_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[ie_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
}
Expand Down
59 changes: 26 additions & 33 deletions flex/engines/graph_db/runtime/common/accessors.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,16 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor {
MultiPropsEdgePropertyPathAccessor(const GraphInterface& graph,
const std::string& prop_name,
const Context& ctx, int tag)
: col_(*std::dynamic_pointer_cast<IEdgeColumn>(ctx.get(tag))) {
: graph_(graph),
col_(*std::dynamic_pointer_cast<IEdgeColumn>(ctx.get(tag))) {
const auto& labels = col_.get_labels();
vertex_label_num_ = graph.schema().vertex_label_num();
edge_label_num_ = graph.schema().edge_label_num();
prop_index_.resize(
2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_,
std::numeric_limits<size_t>::max());
prop_index_.resize(graph.schema().get_edge_triplet_num(),
std::numeric_limits<size_t>::max());
for (auto& label : labels) {
size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ +
label.dst_label * edge_label_num_ + label.edge_label;
size_t idx = graph.schema().get_edge_triplet_id(
label.src_label, label.dst_label, label.edge_label);
const auto& names = graph.schema().get_edge_property_names(
label.src_label, label.dst_label, label.edge_label);
for (size_t i = 0; i < names.size(); ++i) {
Expand Down Expand Up @@ -478,8 +478,8 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor {
bool is_optional() const override { return col_.is_optional(); }

size_t get_index(const LabelTriplet& label) const {
size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ +
label.dst_label * edge_label_num_ + label.edge_label;
size_t idx = graph_.schema().get_edge_triplet_id(
label.src_label, label.dst_label, label.edge_label);
return prop_index_[idx];
}

Expand All @@ -495,6 +495,7 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor {
}

private:
const GraphInterface& graph_;
const IEdgeColumn& col_;
std::vector<size_t> prop_index_;
size_t vertex_label_num_;
Expand Down Expand Up @@ -556,32 +557,23 @@ class MultiPropsEdgePropertyEdgeAccessor : public IAccessor {
using elem_t = T;

MultiPropsEdgePropertyEdgeAccessor(const GraphInterface& graph,
const std::string& name) {
const std::string& name)
: graph_(graph) {
edge_label_num_ = graph.schema().edge_label_num();
vertex_label_num_ = graph.schema().vertex_label_num();
indexs.resize(2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_,
Copy link
Copy Markdown
Member Author

@zhanglei1949 zhanglei1949 Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need to resize to 2* csr_num? @liulx20

indexs.resize(graph.schema().get_edge_triplet_num(),
std::numeric_limits<size_t>::max());
for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
auto src = graph.schema().get_vertex_label_name(src_label);
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
auto dst = graph.schema().get_vertex_label_name(dst_label);
for (label_t edge_label = 0; edge_label < edge_label_num_;
++edge_label) {
auto edge = graph.schema().get_edge_label_name(edge_label);
if (!graph.schema().exist(src, dst, edge)) {
continue;
}
size_t idx = src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label;
const std::vector<std::string>& names =
graph.schema().get_edge_property_names(src_label, dst_label,
edge_label);
for (size_t i = 0; i < names.size(); ++i) {
if (names[i] == name) {
indexs[idx] = i;
break;
}
}

for (size_t index = 0; index < graph.schema().get_edge_triplet_num();
++index) {
auto label = graph.schema().get_edge_triplet(index);
const std::vector<std::string>& names =
graph.schema().get_edge_property_names(
std::get<0>(label), std::get<1>(label), std::get<2>(label));
for (size_t i = 0; i < names.size(); ++i) {
if (names[i] == name) {
indexs[index] = i;
break;
}
}
}
Expand Down Expand Up @@ -613,12 +605,13 @@ class MultiPropsEdgePropertyEdgeAccessor : public IAccessor {
}

size_t get_index(const LabelTriplet& label) const {
size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ +
label.dst_label * edge_label_num_ + label.edge_label;
size_t idx = graph_.schema().get_edge_triplet_id(
label.src_label, label.dst_label, label.edge_label);
return indexs[idx];
}

private:
const GraphInterface& graph_;
std::vector<size_t> indexs;
size_t vertex_label_num_;
size_t edge_label_num_;
Expand Down
7 changes: 4 additions & 3 deletions flex/engines/graph_db/runtime/common/rt_any.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,12 +826,13 @@ void RTAny::sink(const GraphReadInterface& graph, int id,
auto [label, src, dst, prop, dir] = this->as_edge();
e->mutable_src_label()->set_id(label.src_label);
e->mutable_dst_label()->set_id(label.dst_label);
auto edge_label = generate_edge_label_id(label.src_label, label.dst_label,
label.edge_label);

auto edge_triplet_id = graph.schema().get_edge_triplet_id(
label.src_label, label.dst_label, label.edge_label);
e->mutable_label()->set_id(label.edge_label);
e->set_src_id(encode_unique_vertex_id(label.src_label, src));
e->set_dst_id(encode_unique_vertex_id(label.dst_label, dst));
e->set_id(encode_unique_edge_id(edge_label, src, dst));
e->set_id(encode_unique_edge_id(edge_triplet_id, src, dst));
auto& prop_names = graph.schema().get_edge_property_names(
label.src_label, label.dst_label, label.edge_label);
if (prop_names.size() == 1) {
Expand Down
27 changes: 0 additions & 27 deletions flex/engines/graph_db/runtime/common/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,6 @@ std::pair<label_t, vid_t> decode_unique_vertex_id(uint64_t unique_id) {
GlobalId::get_vid(unique_id)};
}

uint32_t generate_edge_label_id(label_t src_label_id, label_t dst_label_id,
label_t edge_label_id) {
uint32_t unique_edge_label_id = src_label_id;
static constexpr int num_bits = sizeof(label_t) * 8;
static_assert(num_bits * 3 <= sizeof(uint32_t) * 8,
"label_t is too large to be encoded in 32 bits");
unique_edge_label_id = unique_edge_label_id << num_bits;
unique_edge_label_id = unique_edge_label_id | dst_label_id;
unique_edge_label_id = unique_edge_label_id << num_bits;
unique_edge_label_id = unique_edge_label_id | edge_label_id;
return unique_edge_label_id;
}

std::tuple<label_t, label_t, label_t> decode_edge_label_id(
uint32_t edge_label_id) {
static constexpr int num_bits = sizeof(label_t) * 8;
static_assert(num_bits * 3 <= sizeof(uint32_t) * 8,
"label_t is too large to be encoded in 32 bits");
auto mask = (1 << num_bits) - 1;
label_t edge_label = edge_label_id & mask;
edge_label_id = edge_label_id >> num_bits;
label_t dst_label = edge_label_id & mask;
edge_label_id = edge_label_id >> num_bits;
label_t src_label = edge_label_id & mask;
return std::make_tuple(src_label, dst_label, edge_label);
}

int64_t encode_unique_edge_id(uint32_t label_id, vid_t src, vid_t dst) {
// We assume label_id is only used by 24 bits.
int64_t unique_edge_id = label_id;
Expand Down
Loading