c4fb0432ae
GitOrigin-RevId: 3fc1143a04da49a92c3663813c6a0c1e8ccd477f
223 lines
6.6 KiB
Rust
223 lines
6.6 KiB
Rust
extern crate clap;
|
|
extern crate posix_mq;
|
|
extern crate libc;
|
|
extern crate nix;
|
|
|
|
use clap::{App, SubCommand, Arg, ArgMatches, AppSettings};
|
|
use posix_mq::{Name, Queue, Message};
|
|
use std::fs::{read_dir, File};
|
|
use std::io::{self, Read, Write};
|
|
use std::process::exit;
|
|
|
|
fn run_ls() {
|
|
let mqueues = read_dir("/dev/mqueue")
|
|
.expect("Could not read message queues");
|
|
|
|
for queue in mqueues {
|
|
let path = queue.unwrap().path();
|
|
let status = {
|
|
let mut file = File::open(&path)
|
|
.expect("Could not open queue file");
|
|
|
|
let mut content = String::new();
|
|
file.read_to_string(&mut content).expect("Could not read queue file");
|
|
|
|
content
|
|
};
|
|
|
|
let queue_name = path.components().last().unwrap()
|
|
.as_os_str()
|
|
.to_string_lossy();
|
|
|
|
println!("/{}: {}", queue_name, status)
|
|
};
|
|
}
|
|
|
|
fn run_inspect(queue_name: &str) {
|
|
let name = Name::new(queue_name).expect("Invalid queue name");
|
|
let queue = Queue::open(name).expect("Could not open queue");
|
|
|
|
println!("Queue {}:\n", queue_name);
|
|
println!("Max. message size: {} bytes", queue.max_size());
|
|
println!("Max. # of pending messages: {}", queue.max_pending());
|
|
}
|
|
|
|
fn run_create(cmd: &ArgMatches) {
|
|
if let Some(rlimit) = cmd.value_of("rlimit") {
|
|
set_rlimit(rlimit.parse().expect("Invalid rlimit value"));
|
|
}
|
|
|
|
let name = Name::new(cmd.value_of("queue").unwrap())
|
|
.expect("Invalid queue name");
|
|
|
|
let max_pending: i64 = cmd.value_of("max-pending").unwrap().parse().unwrap();
|
|
let max_size: i64 = cmd.value_of("max-size").unwrap().parse().unwrap();
|
|
|
|
let queue = Queue::create(name, max_pending, max_size * 1024);
|
|
|
|
match queue {
|
|
Ok(_) => println!("Queue created successfully"),
|
|
Err(e) => {
|
|
writeln!(io::stderr(), "Could not create queue: {}", e).ok();
|
|
exit(1);
|
|
},
|
|
};
|
|
}
|
|
|
|
fn run_receive(queue_name: &str) {
|
|
let name = Name::new(queue_name).expect("Invalid queue name");
|
|
let queue = Queue::open(name).expect("Could not open queue");
|
|
|
|
let message = match queue.receive() {
|
|
Ok(msg) => msg,
|
|
Err(e) => {
|
|
writeln!(io::stderr(), "Failed to receive message: {}", e).ok();
|
|
exit(1);
|
|
}
|
|
};
|
|
|
|
// Attempt to write the message out as a string, but write out raw bytes if it turns out to not
|
|
// be UTF-8 encoded data.
|
|
match String::from_utf8(message.data.clone()) {
|
|
Ok(string) => println!("{}", string),
|
|
Err(_) => {
|
|
writeln!(io::stderr(), "Message not UTF-8 encoded!").ok();
|
|
io::stdout().write(message.data.as_ref()).ok();
|
|
}
|
|
};
|
|
}
|
|
|
|
fn run_send(queue_name: &str, content: &str) {
|
|
let name = Name::new(queue_name).expect("Invalid queue name");
|
|
let queue = Queue::open(name).expect("Could not open queue");
|
|
|
|
let message = Message {
|
|
data: content.as_bytes().to_vec(),
|
|
priority: 0,
|
|
};
|
|
|
|
match queue.send(&message) {
|
|
Ok(_) => (),
|
|
Err(e) => {
|
|
writeln!(io::stderr(), "Could not send message: {}", e).ok();
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn run_rlimit() {
|
|
let mut rlimit = libc::rlimit {
|
|
rlim_cur: 0,
|
|
rlim_max: 0,
|
|
};
|
|
|
|
let mut errno = 0;
|
|
unsafe {
|
|
let res = libc::getrlimit(libc::RLIMIT_MSGQUEUE, &mut rlimit);
|
|
if res != 0 {
|
|
errno = nix::errno::errno();
|
|
}
|
|
};
|
|
|
|
if errno != 0 {
|
|
writeln!(io::stderr(), "Could not get message queue rlimit: {}", errno).ok();
|
|
} else {
|
|
println!("Message queue rlimit:");
|
|
println!("Current limit: {}", rlimit.rlim_cur);
|
|
println!("Maximum limit: {}", rlimit.rlim_max);
|
|
}
|
|
}
|
|
|
|
fn set_rlimit(new_limit: u64) {
|
|
let rlimit = libc::rlimit {
|
|
rlim_cur: new_limit,
|
|
rlim_max: new_limit,
|
|
};
|
|
|
|
let mut errno: i32 = 0;
|
|
unsafe {
|
|
let res = libc::setrlimit(libc::RLIMIT_MSGQUEUE, &rlimit);
|
|
if res != 0 {
|
|
errno = nix::errno::errno();
|
|
}
|
|
}
|
|
|
|
match errno {
|
|
0 => println!("Set RLIMIT_MSGQUEUE hard limit to {}", new_limit),
|
|
_ => {
|
|
// Not mapping these error codes to messages for now, the user can
|
|
// look up the meaning in setrlimit(2).
|
|
panic!("Could not set hard limit: {}", errno);
|
|
}
|
|
};
|
|
}
|
|
|
|
fn main() {
|
|
let ls = SubCommand::with_name("ls").about("list message queues");
|
|
|
|
let queue_arg = Arg::with_name("queue").required(true).takes_value(true);
|
|
|
|
let rlimit_arg = Arg::with_name("rlimit")
|
|
.help("RLIMIT_MSGQUEUE to set for this command")
|
|
.long("rlimit")
|
|
.takes_value(true);
|
|
|
|
let inspect = SubCommand::with_name("inspect")
|
|
.about("inspect details about a queue")
|
|
.arg(&queue_arg);
|
|
|
|
let create = SubCommand::with_name("create")
|
|
.about("Create a new queue")
|
|
.arg(&queue_arg)
|
|
.arg(&rlimit_arg)
|
|
.arg(Arg::with_name("max-size")
|
|
.help("maximum message size (in kB)")
|
|
.long("max-size")
|
|
.required(true)
|
|
.takes_value(true))
|
|
.arg(Arg::with_name("max-pending")
|
|
.help("maximum # of pending messages")
|
|
.long("max-pending")
|
|
.required(true)
|
|
.takes_value(true));
|
|
|
|
let receive = SubCommand::with_name("receive")
|
|
.about("Receive a message from a queue")
|
|
.arg(&queue_arg);
|
|
|
|
let send = SubCommand::with_name("send")
|
|
.about("Send a message to a queue")
|
|
.arg(&queue_arg)
|
|
.arg(Arg::with_name("message")
|
|
.help("the message to send")
|
|
.required(true));
|
|
|
|
let rlimit = SubCommand::with_name("rlimit")
|
|
.about("Get the message queue rlimit")
|
|
.setting(AppSettings::SubcommandRequiredElseHelp);
|
|
|
|
let matches = App::new("mq")
|
|
.setting(AppSettings::SubcommandRequiredElseHelp)
|
|
.version("1.0.0")
|
|
.about("Administrate and inspect POSIX message queues")
|
|
.subcommand(ls)
|
|
.subcommand(inspect)
|
|
.subcommand(create)
|
|
.subcommand(receive)
|
|
.subcommand(send)
|
|
.subcommand(rlimit)
|
|
.get_matches();
|
|
|
|
match matches.subcommand() {
|
|
("ls", _) => run_ls(),
|
|
("inspect", Some(cmd)) => run_inspect(cmd.value_of("queue").unwrap()),
|
|
("create", Some(cmd)) => run_create(cmd),
|
|
("receive", Some(cmd)) => run_receive(cmd.value_of("queue").unwrap()),
|
|
("send", Some(cmd)) => run_send(
|
|
cmd.value_of("queue").unwrap(),
|
|
cmd.value_of("message").unwrap()
|
|
),
|
|
("rlimit", _) => run_rlimit(),
|
|
_ => unimplemented!(),
|
|
}
|
|
}
|