Coverage Report

Created: 2022-11-10 19:56

/home/runner/work/creditcoin/creditcoin/node/src/service/nonce_monitor.rs
Line
Count
Source (jump to first uncovered line)
1
use std::{convert::TryInto, time::Duration};
2
3
use codec::Decode;
4
use creditcoin_node_runtime::AccountId;
5
use jsonrpc_core::{futures::channel::mpsc, futures::join, Failure, Response, Success};
6
use sc_client_api::Backend;
7
use sc_service::{Arc, RpcHandlers, RpcSession};
8
use sp_keystore::CryptoStore;
9
use sp_runtime::{
10
  app_crypto::Ss58Codec, offchain::OffchainStorage, traits::IdentifyAccount, MultiSigner,
11
};
12
use substrate_prometheus_endpoint::Registry;
13
use thiserror::Error;
14
15
use crate::cli::NonceMonitorTarget;
16
17
use super::FullBackend;
18
19
0
#[derive(Debug, Error)]
20
#[error("{0}")]
21
enum Error {
22
  Serde(sc_telemetry::serde_json::Error),
23
  JsonRpc(jsonrpc_core::Error),
24
  Rpc(String),
25
  Codec(codec::Error),
26
  KeyStore(String),
27
  Signer(String),
28
}
29
30
0
async fn rpc_request(
31
0
  handlers: &RpcHandlers,
32
0
  request: &str,
33
0
) -> Result<jsonrpc_core::serde_json::Value, Error> {
34
0
  let (tx, _rx) = mpsc::unbounded();
35
0
  let session = RpcSession::new(tx);
36
37
0
  let response = handlers
38
0
    .rpc_query(&session, request)
39
0
    .await
40
0
    .ok_or_else(|| Error::Rpc("empty response".into()))?;
41
42
0
  let response: Response = jsonrpc_core::serde_json::from_str(&response).map_err(Error::Serde)?;
43
44
0
  let result = match response {
45
0
    Response::Single(out) => match out {
46
0
      jsonrpc_core::Output::Success(Success { result, .. }) => result,
47
0
      jsonrpc_core::Output::Failure(Failure { error, .. }) => {
48
0
        return Err(Error::JsonRpc(error))
49
      },
50
    },
51
    Response::Batch(_) => {
52
0
      unreachable!("we don't send any batch requests, so we cannot receive batch responses")
53
    },
54
  };
55
56
0
  Ok(result)
57
0
}
58
59
0
async fn get_on_chain_nonce(handlers: &RpcHandlers, acct: &AccountId) -> Result<u64, Error> {
60
0
  let request = format!(
61
0
    r#"{{
62
0
            "jsonrpc": "2.0",
63
0
            "method": "system_accountNextIndex",
64
0
            "params": ["{}"],
65
0
            "id": 0
66
0
        }}"#,
67
0
    acct.to_ss58check()
68
0
  );
69
70
0
  let result = rpc_request(handlers, &request).await?;
71
72
0
  result.as_u64().ok_or_else(|| Error::Rpc("expected u64 response".into()))
73
0
}
74
75
0
async fn get_off_chain_nonce_key(
76
0
  handlers: &RpcHandlers,
77
0
  acct: &AccountId,
78
0
) -> Result<Vec<u8>, Error> {
79
0
  let request = format!(
80
0
    r#"{{
81
0
            "jsonrpc": "2.0",
82
0
            "method": "task_getOffchainNonceKey",
83
0
            "params": ["{}"],
84
0
            "id": 0
85
0
        }}"#,
86
0
    acct.to_ss58check()
87
0
  );
88
89
0
  let result = rpc_request(handlers, &request).await?;
90
91
0
  let key: Vec<u8> = jsonrpc_core::serde_json::from_value(result).map_err(Error::Serde)?;
92
93
0
  Ok(key)
94
0
}
95
96
0
async fn get_off_chain_nonce(backend: &FullBackend, key: &[u8]) -> Result<Option<u64>, Error> {
97
0
  let off = backend
98
0
    .offchain_storage()
99
0
    .expect(
100
0
      "offchain storage must be accessible in a creditcoin node. \
101
0
        we only support the file-backed storage backend which always has offchain storage; qed",
102
0
    )
103
0
    .get(sp_offchain::STORAGE_PREFIX, key);
104
105
0
  let off = match off {
106
0
    Some(v) => v,
107
0
    None => return Ok(None),
108
  };
109
0
  let nonce = u32::decode(&mut off.as_slice()).map_err(Error::Codec)?;
110
111
0
  Ok(Some(nonce.into()))
112
0
}
113
114
type UIntGauge = substrate_prometheus_endpoint::prometheus::core::GenericGauge<
115
  substrate_prometheus_endpoint::prometheus::core::AtomicU64,
116
>;
117
118
0
fn register_u64_gauge(registry: &Registry, name: &str, help: &str) -> UIntGauge {
119
0
  substrate_prometheus_endpoint::register(
120
0
    UIntGauge::new(name, help).expect("gauge creation should not fail"),
121
0
    registry,
122
0
  )
123
0
  .expect("registering prometheus gauge should not fail")
124
0
}
125
126
type Keystore = Arc<dyn CryptoStore>;
127
128
0
async fn get_authority_account(
129
0
  target: NonceMonitorTarget,
130
0
  keystore: &Keystore,
131
0
) -> Result<Option<AccountId>, Error> {
132
0
  Ok(match target {
133
    NonceMonitorTarget::Auto => {
134
0
      let keys = keystore
135
0
        .keys(sp_runtime::KeyTypeId(*b"ctcs"))
136
0
        .await
137
0
        .map_err(|e| Error::KeyStore(e.to_string()))?;
138
0
      keys.into_iter()
139
0
        .next()
140
0
        .map(|key| {
141
0
          Ok::<_, Error>(MultiSigner::Sr25519(sp_core::sr25519::Public::from_raw(
142
0
            key.1.try_into().map_err(|e| {
143
0
              Error::Signer(format!(
144
0
                "Invalid authority account from public key: {}",
145
0
                hex::encode(e)
146
0
              ))
147
0
            })?,
148
          )))
149
0
        })
150
0
        .transpose()?
151
0
        .map(|signer| signer.into_account())
152
    },
153
154
0
    NonceMonitorTarget::Account(acct) => Some(acct),
155
  })
156
0
}
157
158
const POLL_INTERVAL: Duration = Duration::from_secs(30);
159
160
pub(super) struct TaskArgs {
161
  pub(super) registry: Registry,
162
  pub(super) monitor_target: NonceMonitorTarget,
163
  pub(super) handlers: RpcHandlers,
164
  pub(super) backend: Arc<FullBackend>,
165
  pub(super) keystore: Keystore,
166
}
167
168
0
pub(super) async fn task(
169
0
  TaskArgs { registry, monitor_target, handlers, backend, keystore }: TaskArgs,
170
0
) {
171
0
  let offchain_gauge = register_u64_gauge(
172
0
    &registry,
173
0
    "authority_offchain_nonce",
174
0
    "the nonce for the authority in offchain storage",
175
0
  );
176
0
  let onchain_gauge = register_u64_gauge(
177
0
    &registry,
178
0
    "authority_onchain_nonce",
179
0
    "the nonce for the authority in onchain storage",
180
0
  );
181
182
0
  let nonce_account =
183
    loop {
184
0
      match get_authority_account(monitor_target.clone(), &keystore).await {
185
0
        Ok(Some(acct)) => break acct,
186
        Ok(None) => {
187
0
          log::info!("No authority account found");
188
0
          tokio::time::sleep(POLL_INTERVAL * 2).await
189
        },
190
0
        Err(e) => {
191
0
          log::error!("Encountered error when trying to get authority account for monitoring: {e}");
192
0
          return;
193
        },
194
      }
195
    };
196
197
0
  let key = match get_off_chain_nonce_key(&handlers, &nonce_account).await {
198
0
    Ok(key) => key,
199
0
    Err(e) => {
200
0
      log::error!("Failed to get key for the offchain nonce of {nonce_account}: {e}");
201
0
      return;
202
    },
203
  };
204
205
  loop {
206
0
    let (onchain, offchain) = join!(
207
0
      get_on_chain_nonce(&handlers, &nonce_account),
208
0
      get_off_chain_nonce(&backend, &key)
209
0
    );
210
211
0
    match (onchain, offchain) {
212
0
      (Ok(on), Ok(off)) => {
213
0
        log::info!("Onchain: {}, offchain: {:?}", on, off);
214
0
        offchain_gauge.set(off.unwrap_or(on));
215
0
        onchain_gauge.set(on);
216
      },
217
0
      (Err(e), Err(e2)) => {
218
0
        log::error!("Errors during nonce monitoring: {e} ; {e2}");
219
      },
220
0
      (Err(e), _) | (_, Err(e)) => {
221
0
        log::error!("Error during nonce monitoring: {e}");
222
      },
223
    }
224
0
    tokio::time::sleep(POLL_INTERVAL).await;
225
  }
226
0
}