Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions mobile_config_cli/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
cmds::gateway::{GatewayInfo, GatewayInfoStream},
cmds::gateway::{GatewayInfo, GatewayInfoStream, GatewayInfoStreamV3},
current_timestamp, NetworkKeyRole, Result,
};

Expand All @@ -13,7 +13,7 @@ use helium_proto::{
AuthorizationListResV1, AuthorizationVerifyReqV1, AuthorizationVerifyResV1,
CarrierIncentivePromotionListReqV1, CarrierIncentivePromotionListResV1, EntityVerifyReqV1,
EntityVerifyResV1, GatewayInfoAtTimestampReqV1, GatewayInfoBatchReqV1, GatewayInfoReqV1,
GatewayInfoResV2, GatewayInfoStreamResV2,
GatewayInfoResV2, GatewayInfoStreamReqV3, GatewayInfoStreamResV2, GatewayInfoStreamResV3,
},
Message,
};
Expand Down Expand Up @@ -290,6 +290,42 @@ impl GatewayClient {
.ok_or_else(|| anyhow::anyhow!("gateway not found"))?;
GatewayInfo::try_from(info)
}

pub async fn info_stream_v3(
&mut self,
batch_size: u32,
keypair: &Keypair,
) -> Result<GatewayInfoStreamV3> {
let mut request = GatewayInfoStreamReqV3 {
batch_size,
signer: keypair.public_key().into(),
signature: vec![],
device_types: vec![],
min_updated_at: 0,
min_location_changed_at: 0,
};
request.signature = request.sign(keypair)?;
let config_pubkey = self.server_pubkey.clone();
let stream = self
.client
.info_stream_v3(request)
.await?
.into_inner()
.filter_map(|res| async move { res.ok() })
.map(move |res| (res, config_pubkey.clone()))
.filter_map(|(res, pubkey)| async move {
match res.verify(&pubkey) {
Ok(()) => Some(res),
Err(err) => {
tracing::error!(?err, "Response verification failed");
None
}
}
})
.boxed();

Ok(stream)
}
}

pub trait MsgSign: Message + std::clone::Clone {
Expand Down Expand Up @@ -318,6 +354,7 @@ impl_sign!(EntityVerifyReqV1, signature);
impl_sign!(GatewayInfoReqV1, signature);
impl_sign!(GatewayInfoBatchReqV1, signature);
impl_sign!(GatewayInfoAtTimestampReqV1, signature);
impl_sign!(GatewayInfoStreamReqV3, signature);
impl_sign!(CarrierIncentivePromotionListReqV1, signature);

pub trait MsgVerify: Message + std::clone::Clone {
Expand Down Expand Up @@ -348,4 +385,5 @@ impl_verify!(AuthorizationListResV1, signature);
impl_verify!(EntityVerifyResV1, signature);
impl_verify!(GatewayInfoResV2, signature);
impl_verify!(GatewayInfoStreamResV2, signature);
impl_verify!(GatewayInfoStreamResV3, signature);
impl_verify!(CarrierIncentivePromotionListResV1, signature);
34 changes: 31 additions & 3 deletions mobile_config_cli/src/cmds/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use super::{GetHotspot, GetHotspotAtTimestamp, GetHotspotBatch, PathBufKeypair};
use super::{DeviceTypeCounts, GetHotspot, GetHotspotAtTimestamp, GetHotspotBatch, PathBufKeypair};
use crate::{client, Msg, PrettyJson, Result};
use angry_purple_tiger::AnimalName;
use futures::StreamExt;
use helium_crypto::PublicKey;
use helium_proto::services::mobile_config::{
GatewayInfoV2 as GatewayInfoProto, GatewayMetadataV2 as GatewayMetadataProto,
DeviceTypeV2, GatewayInfoStreamResV3, GatewayInfoV2 as GatewayInfoProto,
GatewayMetadataV2 as GatewayMetadataProto,
};
use mobile_config::gateway::service::info::{DeploymentInfo, DeviceType};
use serde::Serialize;
use std::str::FromStr;
use std::{collections::HashMap, str::FromStr};

pub type GatewayInfoStream = futures::stream::BoxStream<'static, GatewayInfo>;
pub type GatewayInfoStreamV3 = futures::stream::BoxStream<'static, GatewayInfoStreamResV3>;

#[derive(Debug, Serialize)]
pub struct GatewayInfo {
Expand Down Expand Up @@ -108,3 +110,29 @@ impl TryFrom<GatewayMetadataProto> for GatewayMetadata {
})
}
}

pub async fn device_type_counts(args: DeviceTypeCounts) -> Result<Msg> {
let mut client = client::GatewayClient::new(&args.config_host, &args.config_pubkey).await?;
let mut stream = client
.info_stream_v3(args.batch_size, &args.keypair.to_keypair()?)
.await?;

let mut counts: HashMap<String, u64> = HashMap::new();

while let Some(response) = stream.next().await {
for gateway in response.gateways {
let device_type = DeviceTypeV2::try_from(gateway.device_type)
.map(|dt| format!("{dt:?}"))
.unwrap_or_else(|_| format!("Unknown({})", gateway.device_type));
*counts.entry(device_type).or_default() += 1;
}
}

let total: u64 = counts.values().sum();
let output = serde_json::json!({
"counts": counts,
"total": total
});

Msg::ok(serde_json::to_string_pretty(&output)?)
}
14 changes: 14 additions & 0 deletions mobile_config_cli/src/cmds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,20 @@ pub enum GatewayCommands {
InfoBatch(GetHotspotBatch),
/// Retrieve the on-chain registered info for the hotspot at a timestamp
InfoAtTimestamp(GetHotspotAtTimestamp),
/// Stream all gateways and print counts by device type
DeviceTypeCounts(DeviceTypeCounts),
}

#[derive(Debug, Args)]
pub struct DeviceTypeCounts {
#[arg(short, long, default_value = "5000")]
pub batch_size: u32,
#[arg(from_global)]
pub keypair: PathBuf,
#[arg(from_global)]
pub config_host: String,
#[arg(from_global)]
pub config_pubkey: String,
}

#[derive(Debug, Args)]
Expand Down
3 changes: 3 additions & 0 deletions mobile_config_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub async fn handle_cli(cli: Cli) -> Result<Msg> {
cmds::GatewayCommands::Info(args) => gateway::info(args).await,
cmds::GatewayCommands::InfoBatch(args) => gateway::info_batch(args).await,
cmds::GatewayCommands::InfoAtTimestamp(args) => gateway::info_at_timestamp(args).await,
cmds::GatewayCommands::DeviceTypeCounts(args) => {
gateway::device_type_counts(args).await
}
},
}
}