-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathcephfs-mds-shard.py
141 lines (126 loc) · 6.24 KB
/
cephfs-mds-shard.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
# Matthew Hutchinson
# 45Drives
# Version 2 - May 09/24
import os
import json
import time
import argparse
import subprocess
import re
import sys
# Find pinned directories
def find_non_negative_ceph_dir_pin(starting_directory):
non_negative_dirs = []
def search_directory(directory):
try:
# Get ceph.dir.pin attribute value
output = subprocess.run(["getfattr", "-n", "ceph.dir.pin", directory], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, check=True)
pin_value = output.stdout.strip().split("=")[-1]
if pin_value != '"-1"':
non_negative_dirs.append((directory, pin_value))
for sub_dir in os.listdir(directory):
sub_dir_path = os.path.join(directory, sub_dir)
if os.path.isdir(sub_dir_path):
search_directory(sub_dir_path)
except subprocess.CalledProcessError as e:
pass # Ignore errors for directories without ceph.dir.pin attribute
# Start the search from the subdirectories of the starting directory
for sub_dir in os.listdir(starting_directory):
sub_dir_path = os.path.join(starting_directory, sub_dir)
if os.path.isdir(sub_dir_path):
search_directory(sub_dir_path)
return non_negative_dirs
# Removes pin on Dirs
def repin_directories(non_negative_directories):
for directory, pin_value in non_negative_directories:
print(f"Directory: {directory}, ceph.dir.pin value: {pin_value}")
if non_negative_directories:
choice = input("Do you want to remove the pin on these directories? (yes/no): ")
if choice.lower() == "yes":
for directory, _ in non_negative_directories:
subprocess.run(["setfattr", "-n", "ceph.dir.pin", "-v", "-1", directory])
print("Directories repinned successfully.")
else:
print("No changes made.")
# List pinned Dirs
def list_dir(starting_directory):
non_negative_directories = find_non_negative_ceph_dir_pin(starting_directory)
if non_negative_directories:
print("Directories that are already pinned:")
for directory, pin_value in non_negative_directories:
print(f"Directory: {directory}, ceph.dir.pin value: {pin_value}")
else:
print("No directories pinned found.")
return (non_negative_directories)
# Main fuction to run
def main():
parser = argparse.ArgumentParser(description='Shard directories and pin each to a different MDS.')
parser.add_argument('-d', '--dir', help='The top-level directory to shard.')
parser.add_argument('-D', '--dry-run', action='store_true', help='Run the script in dry run mode. No actions will be performed.')
parser.add_argument('-F', '--force', action='store_true', help='Ignore existing pins')
parser.add_argument('-l', '--list', action='store_true', help='Lists pinned directories')
parser.add_argument('-r', '--remove', action='store_true', help='Remove pin on pinned directories')
parser.add_argument('-m', '--next_mds', help='Specify the next MDS to start with')
args = parser.parse_args()
if args.dir:
# wait for health ok
while not "HEALTH_OK" in subprocess.check_output("ceph health 2>/dev/null", shell=True).decode():
print("Waiting 10s for HEALTH_OK...")
time.sleep(10)
# List Pinned Directories
if args.list and not args.remove:
dirs_to_repin=list_dir(args.dir)
# Removed Pins from Dirs
if args.remove:
dirs_to_repin=list_dir(args.dir)
repin_directories(dirs_to_repin)
# get max_mds info
fs_dump = subprocess.check_output("ceph fs dump --format json 2>/dev/null", shell=True)
max_mds = json.loads(fs_dump)["filesystems"][0]["mdsmap"]["max_mds"]
# loop through the dirs and pin to available MDS
dirs = sorted([d for d in os.listdir(args.dir) if os.path.isdir(os.path.join(args.dir, d))])
# Specify what MDS to start with, if none start at 0
if args.next_mds and int(args.next_mds) < max_mds:
next_mds = int(args.next_mds) #if args.next_mds > max_mds else 0
elif args.next_mds:
choice = input("MDS defined is higher then Max MDS, Do you want to start at 0? (yes/no): ")
if choice.lower() == "yes":
next_mds = 0
else:
sys.exit()
else:
next_mds = 0
if not (args.list or args.remove):
for dir in dirs:
full_dir_path = os.path.join(args.dir, dir)
# check if MDS already pinned
pinned_mds = subprocess.check_output(f'getfattr -n ceph.dir.pin "{full_dir_path}" 2>/dev/null', shell=True).decode()
if "ceph.dir.pin" in pinned_mds:
current_mds = re.search('ceph.dir.pin="(.*)"', pinned_mds).group(1)
if current_mds != "-1" and not args.force :
print(f"{dir} is already pinned by MDS {current_mds}")
else:
if args.dry_run:
# print the action to be performed
print(f"Would pin {dir} to {next_mds} - Remove dry run flag to set")
else:
# pin the dir
print(f"Pinning {dir} to MDS {next_mds}")
subprocess.run(f'setfattr -n ceph.dir.pin -v {next_mds} "{full_dir_path}"', shell=True)
next_mds = str((int(next_mds) + 1) % max_mds)
time.sleep(1)
else:
if args.dry_run:
# print the action to be performed
print(f"Would pin {dir} to {next_mds} - Remove dry run flag to set")
else:
# If ceph.dir.pin is not present, pin the dir
print(f"Pinning {dir} to MDS {next_mds}")
subprocess.run(f'setfattr -n ceph.dir.pin -v {next_mds} "{full_dir_path}"', shell=True)
next_mds = str((int(next_mds) + 1) % max_mds)
time.sleep(1)
else:
parser.print_help()
if __name__ == "__main__":
main()