Skip to content

Commit f94ae29

Browse files
committed
feat: Extend \dt psql command output with shard metadata (#709)
Intercept \dt command, and add a `Shard` column to the output. Add a flag to `Route` that indicates if \dt is being executed so the Shard column is conditionally applied. Add `shard_map` HashMap to `Route` as well that stores tables with their corresponding shard. Introduce `forward_with_shard` function in backend/pool/connection/binding.rs that exposes the shard_map property to be streamed in the query engine. Add engine logic to populate the new column correctly and handle tables sharded across multiple databases Ex. output: List of tables Schema | Name | Type | Owner | Shard --------+-----------+-------+--------+--------- public | only_on_0 | table | ubuntu | 0 public | only_on_1 | table | ubuntu | 1 public | only_on_2 | table | ubuntu | 2 public | users | table | ubuntu | 0, 1, 2 Signed-off-by: Aditya Gollamudi <adigollamudi@gmail.com>
1 parent e717983 commit f94ae29

7 files changed

Lines changed: 131 additions & 5 deletions

File tree

pgdog/src/backend/pool/connection/binding.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
33
use crate::{
44
frontend::{client::query_engine::TwoPcPhase, ClientRequest},
5-
net::{parameter::Parameters, BackendKeyData, ProtocolMessage, Query},
5+
net::{parameter::Parameters, BackendKeyData, ProtocolMessage, Query, Message},
66
state::State,
77
};
88

99
use futures::future::join_all;
10+
use std::collections::HashMap;
1011

1112
use super::*;
1213

@@ -53,6 +54,15 @@ impl Binding {
5354
self.disconnect();
5455
}
5556

57+
pub fn forward_with_shard(&self) -> Option<HashMap<String, Vec<usize>>>{
58+
match self {
59+
Binding::MultiShard(_shards, state) => {
60+
state.table_shard_map()
61+
}
62+
_ => None
63+
}
64+
}
65+
5666
/// Are we connected to a backend?
5767
pub fn connected(&self) -> bool {
5868
match self {
@@ -91,13 +101,22 @@ impl Binding {
91101
return Ok(message);
92102
}
93103
let mut read = false;
94-
for server in shards.iter_mut() {
104+
105+
for (shard, server) in shards.iter_mut().enumerate() {
95106
if !server.has_more_messages() {
96107
continue;
97108
}
98109

99110
let message = server.read().await?;
100111

112+
if state.display_table() {
113+
if let Some(table_name) = message.table_name_from_dt().unwrap() {
114+
let mut map: HashMap<String, Vec<usize>> = state.table_shard_map().unwrap_or_default();
115+
map.entry(table_name.clone()).or_insert_with(Vec::new).push(shard);
116+
state.set_table_shard_map(Some(map));
117+
}
118+
}
119+
101120
read = true;
102121
if let Some(message) = state.forward(message)? {
103122
return Ok(message);

pgdog/src/backend/pool/connection/multi_shard/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Multi-shard connection state.
22
33
use context::Context;
4+
use std::collections::HashMap;
45

56
use crate::{
67
frontend::{router::Route, PreparedStatements},
@@ -345,4 +346,17 @@ impl MultiShard {
345346
}
346347
}
347348
}
349+
350+
pub fn display_table(&self) -> bool {
351+
self.route.display_table()
352+
}
353+
354+
pub fn set_table_shard_map(&mut self, map: Option<HashMap<String, Vec<usize>>>) {
355+
self.route.set_table_shard_map(map);
356+
}
357+
358+
pub fn table_shard_map(&self) -> Option<HashMap<String, Vec<usize>>> {
359+
self.route.table_shard_map()
360+
361+
}
348362
}

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{
1010
state::State,
1111
};
1212

13+
use std::collections::HashSet;
1314
use tracing::debug;
1415

1516
pub mod connect;
@@ -78,6 +79,7 @@ pub struct QueryEngine {
7879
notify_buffer: NotifyBuffer,
7980
pending_explain: Option<ExplainResponseState>,
8081
hooks: QueryEngineHooks,
82+
seen_tables: HashSet<String>
8183
}
8284

8385
impl QueryEngine {
@@ -105,6 +107,7 @@ impl QueryEngine {
105107
pending_explain: None,
106108
begin_stmt: None,
107109
router: Router::default(),
110+
seen_tables: HashSet::new(),
108111
})
109112
}
110113

pgdog/src/frontend/client/query_engine/query.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
router::parser::{explain_trace::ExplainTrace, rewrite::statement::plan::RewriteResult},
88
},
99
net::{
10-
DataRow, FromBytes, Message, Protocol, ProtocolMessage, Query, ReadyForQuery,
10+
DataRow, Field, FromBytes, Message, Protocol, ProtocolMessage, Query, ReadyForQuery,
1111
RowDescription, ToBytes, TransactionState,
1212
},
1313
state::State,
@@ -36,7 +36,7 @@ impl QueryEngine {
3636
// We need to run a query now.
3737
if context.in_transaction() {
3838
// Connect to one shard if not sharded or to all shards
39-
// for a cross-shard tranasction.
39+
// for a cross-shard transaction.
4040
if !self.connect_transaction(context).await? {
4141
return Ok(());
4242
}
@@ -123,8 +123,20 @@ impl QueryEngine {
123123
) -> Result<(), Error> {
124124
self.streaming = message.streaming();
125125

126+
let should_rewrite_for_display_table =
127+
if let Some(route) = context.client_request.route.as_ref() {
128+
route.display_table()
129+
} else {
130+
false
131+
};
132+
126133
let code = message.code();
127134
let payload = if code == 'T' {
135+
if should_rewrite_for_display_table {
136+
let mut fields = RowDescription::from_bytes(message.payload()).unwrap().fields.to_vec();
137+
fields.push(Field::text("Shard"));
138+
message = RowDescription::new(&fields).message()?;
139+
}
128140
Some(message.payload())
129141
} else {
130142
None
@@ -152,6 +164,40 @@ impl QueryEngine {
152164
self.pending_explain = None;
153165
}
154166

167+
168+
if code == 'D' {
169+
if should_rewrite_for_display_table {
170+
let mut dr = DataRow::from_bytes(message.payload()).unwrap();
171+
let col = dr.column(1).unwrap();
172+
173+
let shard_map = self.backend.forward_with_shard();
174+
let table_lookup = std::str::from_utf8(&col).unwrap();
175+
176+
if let Some(map) = shard_map {
177+
178+
if self.seen_tables.contains(table_lookup) {
179+
return Ok(())
180+
}
181+
182+
self.seen_tables.insert(table_lookup.to_string());
183+
184+
let mut new_col = String::new();
185+
for (i, val) in map[table_lookup].iter().enumerate() {
186+
if i > 0 {
187+
new_col.push_str(", ")
188+
}
189+
new_col.push_str(&val.to_string());
190+
}
191+
dr.add(new_col);
192+
} else {
193+
dr.add(None);
194+
}
195+
196+
message = dr.message()?;
197+
Some(message.payload());
198+
}
199+
}
200+
155201
// Messages that we need to send to the client immediately.
156202
// ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B)
157203
let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A')

pgdog/src/frontend/router/parser/query/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,17 @@ impl QueryParser {
108108
Command::default()
109109
};
110110

111+
// Check if we are executing \dt command
112+
if let Command::Query(route)= &mut command {
113+
let q = context.query().unwrap();
114+
if q.contains("pg_catalog.pg_class")
115+
&& q.contains("pg_catalog.pg_namespace")
116+
&& q.contains("relkind")
117+
&& q.contains("pg_toast") {
118+
route.set_display_table(true);
119+
}
120+
}
121+
111122
if let Command::Query(route) = &mut command {
112123
if route.is_cross_shard() && context.shards == 1 {
113124
context

pgdog/src/frontend/router/parser/route.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fmt::Display, ops::Deref};
1+
use std::{fmt::Display, ops::Deref, collections::HashMap};
22

33
use lazy_static::lazy_static;
44

@@ -90,6 +90,8 @@ pub struct Route {
9090
rollback_savepoint: bool,
9191
search_path_driven: bool,
9292
schema_changed: bool,
93+
display_table: bool,
94+
table_shard_map: Option<HashMap<String, Vec<usize>>>,
9395
}
9496

9597
impl Display for Route {
@@ -326,6 +328,25 @@ impl Route {
326328
ShardSource::Table(TableReason::Omni) | ShardSource::RoundRobin(RoundRobinReason::Omni)
327329
)
328330
}
331+
pub fn set_display_table(&mut self, v: bool) {
332+
self.display_table = v;
333+
}
334+
335+
pub fn display_table(&self) -> bool {
336+
self.display_table
337+
}
338+
339+
pub fn table_shard_map(&self) -> Option<HashMap<String, Vec<usize>>> {
340+
if self.table_shard_map == None {
341+
Some(HashMap::new())
342+
} else {
343+
self.table_shard_map.clone()
344+
}
345+
}
346+
347+
pub fn set_table_shard_map(&mut self, map: Option<HashMap<String, Vec<usize>>>) {
348+
self.table_shard_map = map;
349+
}
329350
}
330351

331352
/// Shard source.

pgdog/src/net/messages/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,18 @@ impl Message {
256256
pub fn transaction_error(&self) -> bool {
257257
self.code() == 'Z' && self.payload[5] as char == 'E'
258258
}
259+
260+
pub fn table_name_from_dt(&self) -> Result<Option<String>, Error> {
261+
if self.code() != 'D'{
262+
return Ok(None);
263+
}
264+
let byte_name = DataRow::from_bytes(self.payload()).unwrap().column(1);
265+
266+
let table_name = std::str::from_utf8(&byte_name.unwrap())?.to_string();
267+
268+
return Ok(Some(table_name))
269+
}
270+
259271
}
260272

261273
/// Check that the message we received is what we expected.

0 commit comments

Comments
 (0)