/home/runner/work/creditcoin/creditcoin/node/src/service/nonce_monitor.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use std::{convert::TryInto, time::Duration}; |
2 | | |
3 | | use codec::Decode; |
4 | | use creditcoin_node_runtime::AccountId; |
5 | | use jsonrpc_core::{futures::channel::mpsc, futures::join, Failure, Response, Success}; |
6 | | use sc_client_api::Backend; |
7 | | use sc_service::{Arc, RpcHandlers, RpcSession}; |
8 | | use sp_keystore::CryptoStore; |
9 | | use sp_runtime::{ |
10 | | app_crypto::Ss58Codec, offchain::OffchainStorage, traits::IdentifyAccount, MultiSigner, |
11 | | }; |
12 | | use substrate_prometheus_endpoint::Registry; |
13 | | use thiserror::Error; |
14 | | |
15 | | use crate::cli::NonceMonitorTarget; |
16 | | |
17 | | use super::FullBackend; |
18 | | |
19 | 0 | #[derive(Debug, Error)] |
20 | | #[error("{0}")] |
21 | | enum Error { |
22 | | Serde(sc_telemetry::serde_json::Error), |
23 | | JsonRpc(jsonrpc_core::Error), |
24 | | Rpc(String), |
25 | | Codec(codec::Error), |
26 | | KeyStore(String), |
27 | | Signer(String), |
28 | | } |
29 | | |
30 | 0 | async fn rpc_request( |
31 | 0 | handlers: &RpcHandlers, |
32 | 0 | request: &str, |
33 | 0 | ) -> Result<jsonrpc_core::serde_json::Value, Error> { |
34 | 0 | let (tx, _rx) = mpsc::unbounded(); |
35 | 0 | let session = RpcSession::new(tx); |
36 | | |
37 | 0 | let response = handlers |
38 | 0 | .rpc_query(&session, request) |
39 | 0 | .await |
40 | 0 | .ok_or_else(|| Error::Rpc("empty response".into()))?; |
41 | | |
42 | 0 | let response: Response = jsonrpc_core::serde_json::from_str(&response).map_err(Error::Serde)?; |
43 | | |
44 | 0 | let result = match response { |
45 | 0 | Response::Single(out) => match out { |
46 | 0 | jsonrpc_core::Output::Success(Success { result, .. }) => result, |
47 | 0 | jsonrpc_core::Output::Failure(Failure { error, .. }) => { |
48 | 0 | return Err(Error::JsonRpc(error)) |
49 | | }, |
50 | | }, |
51 | | Response::Batch(_) => { |
52 | 0 | unreachable!("we don't send any batch requests, so we cannot receive batch responses") |
53 | | }, |
54 | | }; |
55 | | |
56 | 0 | Ok(result) |
57 | 0 | } |
58 | | |
59 | 0 | async fn get_on_chain_nonce(handlers: &RpcHandlers, acct: &AccountId) -> Result<u64, Error> { |
60 | 0 | let request = format!( |
61 | 0 | r#"{{ |
62 | 0 | "jsonrpc": "2.0", |
63 | 0 | "method": "system_accountNextIndex", |
64 | 0 | "params": ["{}"], |
65 | 0 | "id": 0 |
66 | 0 | }}"#, |
67 | 0 | acct.to_ss58check() |
68 | 0 | ); |
69 | | |
70 | 0 | let result = rpc_request(handlers, &request).await?; |
71 | | |
72 | 0 | result.as_u64().ok_or_else(|| Error::Rpc("expected u64 response".into())) |
73 | 0 | } |
74 | | |
75 | 0 | async fn get_off_chain_nonce_key( |
76 | 0 | handlers: &RpcHandlers, |
77 | 0 | acct: &AccountId, |
78 | 0 | ) -> Result<Vec<u8>, Error> { |
79 | 0 | let request = format!( |
80 | 0 | r#"{{ |
81 | 0 | "jsonrpc": "2.0", |
82 | 0 | "method": "task_getOffchainNonceKey", |
83 | 0 | "params": ["{}"], |
84 | 0 | "id": 0 |
85 | 0 | }}"#, |
86 | 0 | acct.to_ss58check() |
87 | 0 | ); |
88 | | |
89 | 0 | let result = rpc_request(handlers, &request).await?; |
90 | | |
91 | 0 | let key: Vec<u8> = jsonrpc_core::serde_json::from_value(result).map_err(Error::Serde)?; |
92 | | |
93 | 0 | Ok(key) |
94 | 0 | } |
95 | | |
96 | 0 | async fn get_off_chain_nonce(backend: &FullBackend, key: &[u8]) -> Result<Option<u64>, Error> { |
97 | 0 | let off = backend |
98 | 0 | .offchain_storage() |
99 | 0 | .expect( |
100 | 0 | "offchain storage must be accessible in a creditcoin node. \ |
101 | 0 | we only support the file-backed storage backend which always has offchain storage; qed", |
102 | 0 | ) |
103 | 0 | .get(sp_offchain::STORAGE_PREFIX, key); |
104 | | |
105 | 0 | let off = match off { |
106 | 0 | Some(v) => v, |
107 | 0 | None => return Ok(None), |
108 | | }; |
109 | 0 | let nonce = u32::decode(&mut off.as_slice()).map_err(Error::Codec)?; |
110 | | |
111 | 0 | Ok(Some(nonce.into())) |
112 | 0 | } |
113 | | |
114 | | type UIntGauge = substrate_prometheus_endpoint::prometheus::core::GenericGauge< |
115 | | substrate_prometheus_endpoint::prometheus::core::AtomicU64, |
116 | | >; |
117 | | |
118 | 0 | fn register_u64_gauge(registry: &Registry, name: &str, help: &str) -> UIntGauge { |
119 | 0 | substrate_prometheus_endpoint::register( |
120 | 0 | UIntGauge::new(name, help).expect("gauge creation should not fail"), |
121 | 0 | registry, |
122 | 0 | ) |
123 | 0 | .expect("registering prometheus gauge should not fail") |
124 | 0 | } |
125 | | |
126 | | type Keystore = Arc<dyn CryptoStore>; |
127 | | |
128 | 0 | async fn get_authority_account( |
129 | 0 | target: NonceMonitorTarget, |
130 | 0 | keystore: &Keystore, |
131 | 0 | ) -> Result<Option<AccountId>, Error> { |
132 | 0 | Ok(match target { |
133 | | NonceMonitorTarget::Auto => { |
134 | 0 | let keys = keystore |
135 | 0 | .keys(sp_runtime::KeyTypeId(*b"ctcs")) |
136 | 0 | .await |
137 | 0 | .map_err(|e| Error::KeyStore(e.to_string()))?; |
138 | 0 | keys.into_iter() |
139 | 0 | .next() |
140 | 0 | .map(|key| { |
141 | 0 | Ok::<_, Error>(MultiSigner::Sr25519(sp_core::sr25519::Public::from_raw( |
142 | 0 | key.1.try_into().map_err(|e| { |
143 | 0 | Error::Signer(format!( |
144 | 0 | "Invalid authority account from public key: {}", |
145 | 0 | hex::encode(e) |
146 | 0 | )) |
147 | 0 | })?, |
148 | | ))) |
149 | 0 | }) |
150 | 0 | .transpose()? |
151 | 0 | .map(|signer| signer.into_account()) |
152 | | }, |
153 | | |
154 | 0 | NonceMonitorTarget::Account(acct) => Some(acct), |
155 | | }) |
156 | 0 | } |
157 | | |
158 | | const POLL_INTERVAL: Duration = Duration::from_secs(30); |
159 | | |
160 | | pub(super) struct TaskArgs { |
161 | | pub(super) registry: Registry, |
162 | | pub(super) monitor_target: NonceMonitorTarget, |
163 | | pub(super) handlers: RpcHandlers, |
164 | | pub(super) backend: Arc<FullBackend>, |
165 | | pub(super) keystore: Keystore, |
166 | | } |
167 | | |
168 | 0 | pub(super) async fn task( |
169 | 0 | TaskArgs { registry, monitor_target, handlers, backend, keystore }: TaskArgs, |
170 | 0 | ) { |
171 | 0 | let offchain_gauge = register_u64_gauge( |
172 | 0 | ®istry, |
173 | 0 | "authority_offchain_nonce", |
174 | 0 | "the nonce for the authority in offchain storage", |
175 | 0 | ); |
176 | 0 | let onchain_gauge = register_u64_gauge( |
177 | 0 | ®istry, |
178 | 0 | "authority_onchain_nonce", |
179 | 0 | "the nonce for the authority in onchain storage", |
180 | 0 | ); |
181 | | |
182 | 0 | let nonce_account = |
183 | | loop { |
184 | 0 | match get_authority_account(monitor_target.clone(), &keystore).await { |
185 | 0 | Ok(Some(acct)) => break acct, |
186 | | Ok(None) => { |
187 | 0 | log::info!("No authority account found"); |
188 | 0 | tokio::time::sleep(POLL_INTERVAL * 2).await |
189 | | }, |
190 | 0 | Err(e) => { |
191 | 0 | log::error!("Encountered error when trying to get authority account for monitoring: {e}"); |
192 | 0 | return; |
193 | | }, |
194 | | } |
195 | | }; |
196 | | |
197 | 0 | let key = match get_off_chain_nonce_key(&handlers, &nonce_account).await { |
198 | 0 | Ok(key) => key, |
199 | 0 | Err(e) => { |
200 | 0 | log::error!("Failed to get key for the offchain nonce of {nonce_account}: {e}"); |
201 | 0 | return; |
202 | | }, |
203 | | }; |
204 | | |
205 | | loop { |
206 | 0 | let (onchain, offchain) = join!( |
207 | 0 | get_on_chain_nonce(&handlers, &nonce_account), |
208 | 0 | get_off_chain_nonce(&backend, &key) |
209 | 0 | ); |
210 | | |
211 | 0 | match (onchain, offchain) { |
212 | 0 | (Ok(on), Ok(off)) => { |
213 | 0 | log::info!("Onchain: {}, offchain: {:?}", on, off); |
214 | 0 | offchain_gauge.set(off.unwrap_or(on)); |
215 | 0 | onchain_gauge.set(on); |
216 | | }, |
217 | 0 | (Err(e), Err(e2)) => { |
218 | 0 | log::error!("Errors during nonce monitoring: {e} ; {e2}"); |
219 | | }, |
220 | 0 | (Err(e), _) | (_, Err(e)) => { |
221 | 0 | log::error!("Error during nonce monitoring: {e}"); |
222 | | }, |
223 | | } |
224 | 0 | tokio::time::sleep(POLL_INTERVAL).await; |
225 | | } |
226 | 0 | } |