forked from porcupie/rbd-docker-plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
driver.go
1256 lines (1096 loc) · 38.7 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015 YP LLC.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package main
// Ceph RBD Docker VolumeDriver Plugin
//
// Run rbd-docker-plugin service, which creates a socket that can accept JSON
// HTTP POSTs from docker engine.
//
// Due to some issues using the go-ceph library for locking/unlocking, we
// reimplemented all functionality to use shell CLI commands via the 'rbd'
// executable. To re-enable old go-ceph functionality, use --go-ceph flag.
//
// System Requirements:
// - requires rbd CLI binary for shell operation (default)
// - requires ceph, rados and rbd development headers to use go-ceph
// yum install ceph-devel librados2-devel librbd1-devel
//
// Plugin name: rbd (yp-rbd? ceph-rbd?) -- now configurable via --name
//
// docker run --volume-driver=rbd -v imagename:/mnt/dir IMAGE [CMD]
//
// golang github code examples:
// - https://github.com/docker/docker/blob/master/experimental/plugins_volume.md
// - https://github.com/ceph/go-ceph
// - https://github.com/docker/go-plugins-helpers/tree/master/volume
// - https://github.com/calavera/docker-volume-glusterfs
// - https://github.com/AcalephStorage/docker-volume-ceph-rbd
import (
"errors"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/ceph/go-ceph/rados"
"github.com/ceph/go-ceph/rbd"
dkvolume "github.com/docker/go-plugins-helpers/volume"
)
// TODO: use versioned dependencies -- e.g. newest dkvolume already has breaking changes?
var (
imageNameRegexp = regexp.MustCompile(`^(([-_.[:alnum:]]+)/)?([-_.[:alnum:]]+)(@([0-9]+))?$`) // optional pool or size in image name
rbdUnmapBusyRegexp = regexp.MustCompile(`^exit status 16$`)
)
// Volume is the Docker concept which we map onto a Ceph RBD Image
type Volume struct {
name string // RBD Image name
device string // local host kernel device (e.g. /dev/rbd1)
locker string // track the lock name
fstype string
pool string
}
// TODO: finish modularizing and split out go-ceph and shell-cli implementations
//
// in progress: interface to abstract the ceph operations - either go-ceph lib or sh cli commands
type RbdImageDriver interface {
// shutdown()
// connect(pool string) error // ?? only go-ceph
rbdImageExists(pool, findName string) (bool, error)
createRBDImage(pool string, name string, size int, fstype string) error
rbdImageIsLocked(pool, name string) (bool, error)
lockImage(pool, imagename string) (string, error)
unlockImage(pool, imagename, locker string) error
removeRBDImage(pool, name string) error
renameRBDImage(pool, name, newname string) error
// mapImage(pool, name string)
// unmapImageDevice(device string)
// mountDevice(device, mount, fstype string)
// unmountDevice(device string)
}
//
// our driver type for impl func
type cephRBDVolumeDriver struct {
// - using default ceph cluster name ("ceph")
// - using default ceph config (/etc/ceph/<cluster>.conf)
//
// TODO: when starting, what if there are mounts already for RBD devices?
// do we ingest them as our own or ... currently fails if locked
//
// TODO: use a chan as semaphore instead of mutex in driver?
name string // unique name for plugin
cluster string // ceph cluster to use (default: ceph)
user string // ceph user to use (default: admin)
pool string // ceph pool to use (default: rbd)
root string // scratch dir for mounts for this plugin
config string // ceph config file to read
volumes map[string]*Volume // track locally mounted volumes
m *sync.Mutex // mutex to guard operations that change volume maps or use conn
useGoCeph bool // whether to setup/use go-ceph lib methods (default: false - use shell cli)
conn *rados.Conn // create a connection for each API operation
ioctx *rados.IOContext // context for requested pool
}
// newCephRBDVolumeDriver builds the driver struct, reads config file and connects to cluster
func newCephRBDVolumeDriver(pluginName, cluster, userName, defaultPoolName, rootBase, config string, useGoCeph bool) cephRBDVolumeDriver {
// the root mount dir will be based on docker default root and plugin name - pool added later per volume
mountDir := filepath.Join(rootBase, pluginName)
log.Printf("INFO: newCephRBDVolumeDriver: setting base mount dir=%s", mountDir)
// fill everything except the connection and context
driver := cephRBDVolumeDriver{
name: pluginName,
cluster: cluster,
user: userName,
pool: defaultPoolName,
root: mountDir,
config: config,
volumes: map[string]*Volume{},
m: &sync.Mutex{},
useGoCeph: useGoCeph,
}
return driver
}
// Capabilities
// Scope: global - images managed using this plugin can be considered "global"
// TODO: make configurable
func (d cephRBDVolumeDriver) Capabilities(r dkvolume.Request) dkvolume.Response {
return dkvolume.Response{
Capabilities: dkvolume.Capability{
Scope: "global",
},
}
}
// ************************************************************
//
// Implement the Docker VolumeDriver API via dkvolume interface
//
// Using https://github.com/docker/go-plugins-helpers/tree/master/volume
//
// ************************************************************
// Create will ensure the RBD image requested is available. Plugin requires
// --create option to provision new RBD images.
//
// Docker Volume Create Options:
// size - in MB
// pool
// fstype
//
//
// POST /VolumeDriver.Create
//
// Request:
// {
// "Name": "volume_name",
// "Opts": {}
// }
// Instruct the plugin that the user wants to create a volume, given a user
// specified volume name. The plugin does not need to actually manifest the
// volume on the filesystem yet (until Mount is called).
//
// Response:
// { "Err": null }
// Respond with a string error if an error occurred.
//
func (d cephRBDVolumeDriver) Create(r dkvolume.Request) dkvolume.Response {
log.Printf("INFO: API Create(%q)", r)
d.m.Lock()
defer d.m.Unlock()
return d.createImage(r)
}
func (d cephRBDVolumeDriver) createImage(r dkvolume.Request) dkvolume.Response {
log.Printf("INFO: createImage(%q)", r)
fstype := *defaultImageFSType
// parse image name optional/default pieces
pool, name, size, err := d.parseImagePoolNameSize(r.Name)
if err != nil {
log.Printf("ERROR: parsing volume: %s", err)
return dkvolume.Response{Err: err.Error()}
}
// Options to override from `docker volume create -o OPT=VAL ...`
if r.Options["pool"] != "" {
pool = r.Options["pool"]
}
if r.Options["size"] != "" {
size, err = strconv.Atoi(r.Options["size"])
if err != nil {
log.Printf("WARN: using default size. unable to parse int from %s: %s", r.Options["size"], err)
size = *defaultImageSizeMB
}
}
if r.Options["fstype"] != "" {
fstype = r.Options["fstype"]
}
// check for mount
mount := d.mountpoint(pool, name)
// do we already know about this volume? return early
if _, found := d.volumes[mount]; found {
log.Println("INFO: Volume is already in known mounts: " + mount)
return dkvolume.Response{}
}
// otherwise, connect to Ceph and check ceph rbd api for it
if d.useGoCeph {
err = d.connect(pool)
if err != nil {
log.Printf("ERROR: unable to connect to ceph and access pool: %s", err)
return dkvolume.Response{Err: err.Error()}
}
defer d.shutdown()
}
exists, err := d.rbdImageExists(pool, name)
if err != nil {
log.Printf("ERROR: checking for RBD Image: %s", err)
return dkvolume.Response{Err: err.Error()}
}
if !exists {
if !*canCreateVolumes {
errString := fmt.Sprintf("Ceph RBD Image not found: %s", name)
log.Println("ERROR: " + errString)
return dkvolume.Response{Err: errString}
}
// try to create it ... use size and default fs-type
err = d.createRBDImage(pool, name, size, fstype)
if err != nil {
errString := fmt.Sprintf("Unable to create Ceph RBD Image(%s): %s", name, err)
log.Println("ERROR: " + errString)
return dkvolume.Response{Err: errString}
}
}
return dkvolume.Response{}
}
// POST /VolumeDriver.Remove
//
// Request:
// { "Name": "volume_name" }
// Remove a volume, given a user specified volume name.
//
// Response:
// { "Err": null }
// Respond with a string error if an error occurred.
//
func (d cephRBDVolumeDriver) Remove(r dkvolume.Request) dkvolume.Response {
log.Printf("INFO: API Remove(%s)", r)
d.m.Lock()
defer d.m.Unlock()
// parse full image name for optional/default pieces
pool, name, _, err := d.parseImagePoolNameSize(r.Name)
if err != nil {
log.Printf("ERROR: parsing volume: %s", err)
return dkvolume.Response{Err: err.Error()}
}
mount := d.mountpoint(pool, name)
// do we know about this volume? does it matter?
if _, found := d.volumes[mount]; !found {
log.Printf("WARN: Volume is not in known mounts: %s", mount)
}
// connect to Ceph and check ceph rbd api for it
if d.useGoCeph {
err = d.connect(pool)
if err != nil {
log.Printf("ERROR: unable to connect to ceph and access pool: %s", err)
return dkvolume.Response{Err: err.Error()}
}
defer d.shutdown()
}
exists, err := d.rbdImageExists(pool, name)
if err != nil {
log.Printf("ERROR: checking for RBD Image: %s", err)
return dkvolume.Response{Err: err.Error()}
}
if !exists {
errString := fmt.Sprintf("Ceph RBD Image not found: %s", name)
log.Println("ERROR: " + errString)
return dkvolume.Response{Err: errString}
}
// attempt to gain lock before remove - lock seems to disappear after rm (but not after rename)
locker, err := d.lockImage(pool, name)
if err != nil {
errString := fmt.Sprintf("Unable to lock image for remove: %s", name)
log.Println("ERROR: " + errString)
return dkvolume.Response{Err: errString}
}
// remove action can be: ignore, delete or rename
if removeActionFlag == "delete" {
// delete it (for real - destroy it ... )
err = d.removeRBDImage(pool, name)
if err != nil {
errString := fmt.Sprintf("Unable to remove Ceph RBD Image(%s): %s", name, err)
log.Println("ERROR: " + errString)
defer d.unlockImage(pool, name, locker)
return dkvolume.Response{Err: errString}
}
defer d.unlockImage(pool, name, locker)
} else if removeActionFlag == "rename" {
// just rename it (in case needed later, or can be culled via script)
err = d.renameRBDImage(pool, name, "zz_"+name)
if err != nil {
errString := fmt.Sprintf("Unable to rename with zz_ prefix: RBD Image(%s): %s", name, err)
log.Println("ERROR: " + errString)
// unlock by old name
defer d.unlockImage(pool, name, locker)
return dkvolume.Response{Err: errString}
}
// unlock by new name
defer d.unlockImage(pool, "zz_"+name, locker)
} else {
// ignore the remove call - but unlock ?
defer d.unlockImage(pool, name, locker)
}
delete(d.volumes, mount)
return dkvolume.Response{}
}
// Mount will Ceph Map the RBD image to the local kernel and create a mount
// point and mount the image.
//
// POST /VolumeDriver.Mount
//
// Request:
// { "Name": "volume_name" }
// Docker requires the plugin to provide a volume, given a user specified
// volume name. This is called once per container start.
//
// Response:
// { "Mountpoint": "/path/to/directory/on/host", "Err": null }
// Respond with the path on the host filesystem where the volume has been
// made available, and/or a string error if an error occurred.
//
// TODO: utilize the new MountRequest.ID field to track volumes
func (d cephRBDVolumeDriver) Mount(r dkvolume.MountRequest) dkvolume.Response {
log.Printf("INFO: API Mount(%s)", r)
d.m.Lock()
defer d.m.Unlock()
// parse full image name for optional/default pieces
pool, name, _, err := d.parseImagePoolNameSize(r.Name)
if err != nil {
log.Printf("ERROR: parsing volume: %s", err)
return dkvolume.Response{Err: err.Error()}
}
mount := d.mountpoint(pool, name)
// FIXME: this is failing - see error below - for now we just attempt to grab a lock
// check that the image is not locked already
//locked, err := d.rbdImageIsLocked(name)
//if locked || err != nil {
// log.Printf("ERROR: checking for RBD Image(%s) lock: %s", name, err)
// return dkvolume.Response{Err: "RBD Image locked"}
//}
// attempt to lock
locker, err := d.lockImage(pool, name)
if err != nil {
log.Printf("ERROR: locking RBD Image(%s): %s", name, err)
return dkvolume.Response{Err: "Unable to get Exclusive Lock"}
}
// map and mount the RBD image -- these are OS level commands, not avail in go-ceph
// map
device, err := d.mapImage(pool, name)
if err != nil {
log.Printf("ERROR: mapping RBD Image(%s) to kernel device: %s", name, err)
// failsafe: need to release lock
defer d.unlockImage(pool, name, locker)
return dkvolume.Response{Err: "Unable to map"}
}
// determine device FS type
fstype, err := d.deviceType(device)
if err != nil {
log.Printf("WARN: unable to detect RBD Image(%s) fstype: %s", name, err)
// NOTE: don't fail - FOR NOW we will assume default plugin fstype
fstype = *defaultImageFSType
}
// double check image filesystem if possible
err = d.verifyDeviceFilesystem(device, mount, fstype)
if err != nil {
log.Printf("ERROR: filesystem may need repairs: %s", err)
// failsafe: need to release lock and unmap kernel device
defer d.unmapImageDevice(device)
defer d.unlockImage(pool, name, locker)
return dkvolume.Response{Err: "Image filesystem has errors, requires manual repairs"}
}
// check for mountdir - create if necessary
err = os.MkdirAll(mount, os.ModeDir|os.FileMode(int(0775)))
if err != nil {
log.Printf("ERROR: creating mount directory: %s", err)
// failsafe: need to release lock and unmap kernel device
defer d.unmapImageDevice(device)
defer d.unlockImage(pool, name, locker)
return dkvolume.Response{Err: "Unable to make mountdir"}
}
// mount
err = d.mountDevice(fstype, device, mount)
if err != nil {
log.Printf("ERROR: mounting device(%s) to directory(%s): %s", device, mount, err)
// need to release lock and unmap kernel device
defer d.unmapImageDevice(device)
defer d.unlockImage(pool, name, locker)
return dkvolume.Response{Err: "Unable to mount device"}
}
// if all that was successful - add to our list of volumes
d.volumes[mount] = &Volume{
name: name,
device: device,
locker: locker,
fstype: fstype,
pool: pool,
}
return dkvolume.Response{Mountpoint: mount}
}
// Get the list of volumes registered with the plugin.
//
// POST /VolumeDriver.List
//
// Request:
// {}
// List the volumes mapped by this plugin.
//
// Response:
// { "Volumes": [ { "Name": "volume_name", "Mountpoint": "/path/to/directory/on/host" } ], "Err": null }
// Respond with an array containing pairs of known volume names and their
// respective paths on the host filesystem (where the volumes have been
// made available).
//
func (d cephRBDVolumeDriver) List(r dkvolume.Request) dkvolume.Response {
vols := make([]*dkvolume.Volume, 0, len(d.volumes))
// for each registered mountpoint
for k, v := range d.volumes {
// append it and its name to the result
vols = append(vols, &dkvolume.Volume{
Name: v.name,
Mountpoint: k,
})
}
log.Printf("INFO: List request => %s", vols)
return dkvolume.Response{Volumes: vols}
}
// Get the volume info.
//
// POST /VolumeDriver.Get
//
// Request:
// { "Name": "volume_name" }
// Docker needs reminding of the path to the volume on the host.
//
// Response:
// { "Volume": { "Name": "volume_name", "Mountpoint": "/path/to/directory/on/host" }, "Err": null }
// Respond with a tuple containing the name of the queried volume and the
// path on the host filesystem where the volume has been made available,
// and/or a string error if an error occurred.
//
func (d cephRBDVolumeDriver) Get(r dkvolume.Request) dkvolume.Response {
// parse full image name for optional/default pieces
pool, name, _, err := d.parseImagePoolNameSize(r.Name)
if err != nil {
log.Printf("ERROR: parsing volume: %s", err)
return dkvolume.Response{Err: err.Error()}
}
// Check to see if the image exists
exists, err := d.rbdImageExists(pool, name)
if err != nil {
log.Printf("WARN: checking for RBD Image: %s", err)
return dkvolume.Response{Err: err.Error()}
}
mountPath := d.mountpoint(pool, name)
if !exists {
log.Printf("WARN: Image %s does not exist", r.Name)
delete(d.volumes, mountPath)
return dkvolume.Response{Err: fmt.Sprintf("Image %s does not exist", r.Name)}
}
log.Printf("INFO: Get request(%s) => %s", name, mountPath)
// TODO: what to do if the mountpoint registry (d.volumes) has a different name?
return dkvolume.Response{Volume: &dkvolume.Volume{Name: r.Name, Mountpoint: mountPath}}
}
// Path returns the path to host directory mountpoint for volume.
//
// POST /VolumeDriver.Path
//
// Request:
// { "Name": "volume_name" }
// Docker needs reminding of the path to the volume on the host.
//
// Response:
// { "Mountpoint": "/path/to/directory/on/host", "Err": null }
// Respond with the path on the host filesystem where the volume has been
// made available, and/or a string error if an error occurred.
//
// NOTE: this method does not require the Ceph connection
//
func (d cephRBDVolumeDriver) Path(r dkvolume.Request) dkvolume.Response {
// parse full image name for optional/default pieces
pool, name, _, err := d.parseImagePoolNameSize(r.Name)
if err != nil {
log.Printf("ERROR: parsing volume: %s", err)
return dkvolume.Response{Err: err.Error()}
}
mountPath := d.mountpoint(pool, name)
log.Printf("INFO: API Path request(%s) => %s", name, mountPath)
return dkvolume.Response{Mountpoint: mountPath}
}
// POST /VolumeDriver.Unmount
//
// - assuming writes are finished and no other containers using same disk on this host?
// Request:
// { "Name": "volume_name" }
// Indication that Docker no longer is using the named volume. This is
// called once per container stop. Plugin may deduce that it is safe to
// deprovision it at this point.
//
// Response:
// { "Err": null }
// Respond with a string error if an error occurred.
//
// FIXME: TODO: we are getting an Unmount call from docker daemon after a
// failed Mount (e.g. device locked), which causes the device to be
// unmounted/unmapped/unlocked while possibly in use by another container --
// revisit the API, are we doing something wrong or perhaps we can fail sooner
//
// TODO: utilize the new UnmountRequest.ID field to track volumes
func (d cephRBDVolumeDriver) Unmount(r dkvolume.UnmountRequest) dkvolume.Response {
log.Printf("INFO: API Unmount(%s)", r)
d.m.Lock()
defer d.m.Unlock()
var err_msgs = []string{}
// parse full image name for optional/default pieces
pool, name, _, err := d.parseImagePoolNameSize(r.Name)
if err != nil {
log.Printf("ERROR: parsing volume: %s", err)
return dkvolume.Response{Err: err.Error()}
}
mount := d.mountpoint(pool, name)
// connect to Ceph so we can manipulate RBD Image
if d.useGoCeph {
err = d.connect(pool)
if err != nil {
log.Printf("ERROR: unable to connect to ceph and access pool: %s", err)
return dkvolume.Response{Err: err.Error()}
}
defer d.shutdown()
}
// check if it's in our mounts - we may not know about it if plugin was started late?
vol, found := d.volumes[mount]
if !found {
log.Printf("WARN: Volume is not in known mounts: will attempt limited Unmount: %s/%s", pool, name)
// set up a fake Volume with defaults ...
// - device is /dev/rbd/<pool>/<image> in newer ceph versions
// - assume we are the locker (will fail if locked from another host)
vol = &Volume{
pool: pool,
name: name,
device: fmt.Sprintf("/dev/rbd/%s/%s", pool, name),
locker: d.localLockerCookie(),
}
}
// unmount
// NOTE: this might succeed even if device is still in use inside container. device will dissappear from host side but still be usable inside container :(
err = d.unmountDevice(vol.device)
if err != nil {
log.Printf("ERROR: unmounting device(%s): %s", vol.device, err)
// failsafe: will still attempt to unmap and unlock
err_msgs = append(err_msgs, "Error unmounting device")
}
// unmap
err = d.unmapImageDevice(vol.device)
if err != nil {
log.Printf("ERROR: unmapping image device(%s): %s", vol.device, err)
// NOTE: rbd unmap exits 16 if device is still being used - unlike umount. try to recover differently in that case
if rbdUnmapBusyRegexp.MatchString(err.Error()) {
// can't always re-mount and not sure if we should here ... will be cleaned up once original container goes away
log.Printf("WARN: unmap failed due to busy device, early exit from this Unmount request.")
return dkvolume.Response{Err: err.Error()}
}
// other error, failsafe: proceed to attempt to unlock
err_msgs = append(err_msgs, "Error unmapping kernel device")
}
// unlock
err = d.unlockImage(vol.pool, vol.name, vol.locker)
if err != nil {
log.Printf("ERROR: unlocking RBD image(%s): %s", vol.name, err)
err_msgs = append(err_msgs, "Error unlocking image")
}
// forget it
delete(d.volumes, mount)
// check for piled up errors
if len(err_msgs) > 0 {
return dkvolume.Response{Err: strings.Join(err_msgs, ", ")}
}
return dkvolume.Response{}
}
// END Docker VolumeDriver Plugin API methods
// ***************************************************************************
// shutdown and connect are used when d.useGoCeph == true
// shutdown closes the connection - maybe not needed unless we recreate conn?
// more info:
// - https://github.com/ceph/go-ceph/blob/f251b53/rados/ioctx.go#L140
// - http://ceph.com/docs/master/rados/api/librados/
func (d *cephRBDVolumeDriver) shutdown() {
log.Println("INFO: Ceph RBD Driver shutdown() called")
if d.ioctx != nil {
d.ioctx.Destroy()
}
if d.conn != nil {
d.conn.Shutdown()
}
}
// connect builds up the ceph conn and default pool
func (d *cephRBDVolumeDriver) connect(pool string) error {
log.Printf("INFO: connect() to Ceph via go-ceph, with pool: %s", pool)
// create the go-ceph Client Connection
var cephConn *rados.Conn
var err error
if d.cluster == "" {
cephConn, err = rados.NewConnWithUser(d.user)
} else {
// FIXME: TODO: can't seem to use a cluster name -- get error -22 from noahdesu/go-ceph/rados:
// panic: Unable to create ceph connection to cluster=ceph with user=admin: rados: ret=-22
cephConn, err = rados.NewConnWithClusterAndUser(d.cluster, d.user)
}
if err != nil {
log.Printf("ERROR: Unable to create ceph connection to cluster=%s with user=%s: %s", d.cluster, d.user, err)
return err
}
// read ceph.conf and setup connection
if d.config == "" {
err = cephConn.ReadDefaultConfigFile()
} else {
err = cephConn.ReadConfigFile(d.config)
}
if err != nil {
log.Printf("ERROR: Unable to read ceph config: %s", err)
return err
}
err = cephConn.Connect()
if err != nil {
log.Printf("ERROR: Unable to connect to Ceph: %s", err)
return err
}
// can now set conn in driver
d.conn = cephConn
// setup the requested pool context
ioctx, err := d.goceph_openContext(pool)
if err != nil {
return err
}
d.ioctx = ioctx
return nil
}
// mountpoint returns the expected path on host
func (d *cephRBDVolumeDriver) mountpoint(pool, name string) string {
return filepath.Join(d.root, pool, name)
}
// parseImagePoolNameSize parses out any optional parameters from Image Name
// passed from docker run. Fills in unspecified options with default pool or
// size.
//
// Returns: pool, image-name, size, error
//
func (d *cephRBDVolumeDriver) parseImagePoolNameSize(fullname string) (pool string, imagename string, size int, err error) {
// Examples of regexp matches:
// foo: ["foo" "" "" "foo" "" ""]
// foo@1024: ["foo@1024" "" "" "foo" "@1024" "1024"]
// pool/foo: ["pool/foo" "pool/" "pool" "foo" "" ""]
// pool/foo@1024: ["pool/foo@1024" "pool/" "pool" "foo" "@1024" "1024"]
//
// Match indices:
// 0: matched string
// 1: pool with slash
// 2: pool no slash
// 3: image name
// 4: size with @
// 5: size only
//
matches := imageNameRegexp.FindStringSubmatch(fullname)
if isDebugEnabled() {
log.Printf("DEBUG: parseImagePoolNameSize: \"%s\": %q", fullname, matches)
}
if len(matches) != 6 {
return "", "", 0, errors.New("Unable to parse image name: " + fullname)
}
// 2: pool
pool = d.pool // defaul pool for plugin
if matches[2] != "" {
pool = matches[2]
}
// 3: image
imagename = matches[3]
// 5: size
size = *defaultImageSizeMB
if matches[5] != "" {
var err error
size, err = strconv.Atoi(matches[5])
if err != nil {
log.Printf("WARN: using default. unable to parse int from %s: %s", matches[5], err)
size = *defaultImageSizeMB
}
}
return pool, imagename, size, nil
}
// rbdImageExists will check for an existing Ceph RBD Image
func (d *cephRBDVolumeDriver) rbdImageExists(pool, findName string) (bool, error) {
if d.useGoCeph {
return d.goceph_rbdImageExists(pool, findName)
}
return d.sh_rbdImageExists(pool, findName)
}
// sh_rbdImageExists uses rbd info to check for ceph rbd image
func (d *cephRBDVolumeDriver) sh_rbdImageExists(pool, findName string) (bool, error) {
_, err := d.rbdsh(pool, "info", findName)
if err != nil {
// NOTE: even though method signature returns err - we take the error
// in this instance as the indication that the image does not exist
// TODO: can we double check exit value for exit status 2 ?
return false, nil
}
return true, nil
}
func (d *cephRBDVolumeDriver) goceph_rbdImageExists(pool, findName string) (bool, error) {
log.Printf("INFO: checking if rbdImageExists(%s/%s)", pool, findName)
if findName == "" {
return false, fmt.Errorf("Empty Ceph RBD Image name")
}
ctx, err := d.goceph_openContext(pool)
if err != nil {
return false, err
}
defer d.goceph_shutdownContext(ctx)
img := rbd.GetImage(ctx, findName)
err = img.Open(true)
defer img.Close()
if err != nil {
if err == rbd.RbdErrorNotFound {
log.Printf("INFO: Ceph RBD Image ('%s') not found: %s", findName, err)
return false, nil
}
return false, err
}
return true, nil
}
// goceph_shutdownContext will destroy any non-default ioctx
func (d *cephRBDVolumeDriver) goceph_shutdownContext(ioctx *rados.IOContext) {
if ioctx != nil {
ioctx.Destroy()
}
}
// goceph_openContext provides access to a specific Ceph Pool
func (d *cephRBDVolumeDriver) goceph_openContext(pool string) (*rados.IOContext, error) {
// setup the requested pool context
ioctx, err := d.conn.OpenIOContext(pool)
if err != nil {
// TODO: make sure we aren't hiding a useful error struct by casting to string?
msg := fmt.Sprintf("Unable to open context(%s): %s", pool, err)
log.Printf("ERROR: " + msg)
return ioctx, errors.New(msg)
}
return ioctx, nil
}
// createRBDImage will create a new Ceph block device and make a filesystem on it
func (d *cephRBDVolumeDriver) createRBDImage(pool string, name string, size int, fstype string) error {
// NOTE: there is no goceph_ version of this func - but parts of sh version do (lock/unlock)
return d.sh_createRBDImage(pool, name, size, fstype)
}
func (d *cephRBDVolumeDriver) sh_createRBDImage(pool string, name string, size int, fstype string) error {
log.Printf("INFO: Attempting to create new RBD Image: (%s/%s, %s, %s)", pool, name, size, fstype)
// check that fs is valid type (needs mkfs.fstype in PATH)
mkfs, err := exec.LookPath("mkfs." + fstype)
if err != nil {
msg := fmt.Sprintf("Unable to find mkfs for %s in PATH: %s", fstype, err)
return errors.New(msg)
}
// TODO: create a go-ceph Create(..) func for this?
// create the block device image with format=2 (v2)
// should we enable all v2 image features?: +1: layering support +2: striping v2 support +4: exclusive locking support +8: object map support
// NOTE: i tried but "2015-08-02 20:24:36.726758 7f87787907e0 -1 librbd: librbd does not support requested features."
// NOTE: I also tried just image-features=4 (locking) - but map will fail:
// sudo rbd unmap mynewvol => rbd: 'mynewvol' is not a block device, rbd: unmap failed: (22) Invalid argument
// "--image-features", strconv.Itoa(4),
_, err = d.rbdsh(
pool, "create",
"--image-format", strconv.Itoa(2),
"--size", strconv.Itoa(size),
name,
)
if err != nil {
return err
}
// lock it temporarily for fs creation
lockname, err := d.lockImage(pool, name)
if err != nil {
// TODO: defer image delete?
return err
}
// map to kernel device
device, err := d.mapImage(pool, name)
if err != nil {
defer d.unlockImage(pool, name, lockname)
return err
}
// make the filesystem - give it some time
_, err = shWithTimeout(5*time.Minute, mkfs, device)
if err != nil {
defer d.unmapImageDevice(device)
defer d.unlockImage(pool, name, lockname)
return err
}
// TODO: should we chown/chmod the directory? e.g. non-root container users won't be able to write
// unmap
err = d.unmapImageDevice(device)
if err != nil {
// ? if we cant unmap -- are we screwed? should we unlock?
return err
}
// unlock
err = d.unlockImage(pool, name, lockname)
if err != nil {
return err
}
return nil
}
// rbdImageIsLocked returns true if named image is already locked
func (d *cephRBDVolumeDriver) rbdImageIsLocked(pool, name string) (bool, error) {
if d.useGoCeph {
return d.goceph_rbdImageIsLocked(pool, name)
}
return d.sh_rbdImageIsLocked(pool, name)
}
func (d *cephRBDVolumeDriver) sh_rbdImageIsLocked(pool, name string) (bool, error) {
// check the output for a lock -- if blank or error, assume not locked (?)
out, err := d.rbdsh(pool, "lock", "ls", name)
if err != nil || out != "" {
return false, err
}
// otherwise - no error and output is not blank - assume a lock exists ...
return true, nil
}
// FIXME: getting panics when trying to run below go-ceph code, at ListLockers():
//
// see https://github.com/yp-engineering/rbd-docker-plugin/issues/3
//
func (d *cephRBDVolumeDriver) goceph_rbdImageIsLocked(pool, name string) (bool, error) {
if pool == "" || name == "" {
return true, errors.New("rbdImageIsLocked: pool and name required")
}
// make the image struct
rbdImage := rbd.GetImage(d.ioctx, name)
// open it (read-only)
err := rbdImage.Open(true)
if err != nil {
log.Printf("ERROR: opening rbd image(%s): %s", name, err)
return true, err
}
defer rbdImage.Close()
// check for locks -- for our purposes, with even one lock we consider image locked
//lockers := []rbd.Locker{}
//lockers := make([]rbd.Locker, 10)
tag, lockers, err := rbdImage.ListLockers()
if err != nil {
log.Printf("ERROR: retrieving Lockers list for Image(%s): %s", name, err)
return true, err
}
if len(lockers) > 0 {
log.Printf("WARN: RBD Image is locked: tag=%s, lockers=%q", tag, lockers)
return true, nil
}
return false, nil
}
// lockImage locks image and returns locker cookie name
func (d *cephRBDVolumeDriver) lockImage(pool, imagename string) (string, error) {
if d.useGoCeph {
return d.goceph_lockImage(pool, imagename)
}
return d.sh_lockImage(pool, imagename)
}
func (d *cephRBDVolumeDriver) sh_lockImage(pool, imagename string) (string, error) {
cookie := d.localLockerCookie()
_, err := d.rbdsh(pool, "lock", "add", imagename, cookie)
if err != nil {
return "", err
}
return cookie, nil
}
func (d *cephRBDVolumeDriver) goceph_lockImage(pool, imagename string) (string, error) {
log.Printf("INFO: lockImage(%s/%s)", pool, imagename)
// build image struct
rbdImage := rbd.GetImage(d.ioctx, imagename)
// open it (read-only)
err := rbdImage.Open(true)
if err != nil {
log.Printf("ERROR: opening rbd image(%s): %s", imagename, err)
return "", err
}
defer rbdImage.Close()
// lock it using hostname
locker := d.localLockerCookie()
err = rbdImage.LockExclusive(locker)