Skip to content
This repository has been archived by the owner on Sep 12, 2018. It is now read-only.

Registry v1/v2 connector #1024

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions docker_registry/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import functools
import logging
import re
import time

import flask
Expand All @@ -17,6 +18,8 @@
from .app import cfg
from .lib import cache
from .lib import checksums
# golang
from .lib import golangconnector
from .lib import layers
from .lib import mirroring
from .lib import signals
Expand All @@ -27,6 +30,10 @@

store = storage.load()
logger = logging.getLogger(__name__)
_re_hex_image_id = re.compile(r'^([a-f0-9]{16}|[a-f0-9]{64})$')

# golang
gconnect = golangconnector.Connector(store)


def require_completion(f):
Expand All @@ -39,6 +46,16 @@ def wrapper(*args, **kwargs):
return wrapper


def valid_image_id(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
image_id = kwargs.get('image_id', '')
if _re_hex_image_id.match(image_id):
return f(*args, **kwargs)
return toolkit.api_error("Invalid image ID", 404)
return wrapper


def set_cache_headers(f):
"""Returns HTTP headers suitable for caching."""
@functools.wraps(f)
Expand Down Expand Up @@ -82,7 +99,15 @@ def _get_image_layer(image_id, headers=None, bytes_range=None):
# offload a lot of expensive I/O and get faster I/O
if cfg.storage_redirect:
try:
content_redirect_url = store.content_redirect_url(path)
# golang
try:
gopath = gconnect.layer(image_id)
content_redirect_url = store.content_redirect_url(
json.loads(gopath)['gopath'] if gopath else path
)
except Exception as e:
logger.debug('Fail, but survive %s' % e)
content_redirect_url = store.content_redirect_url(path)
if content_redirect_url:
return flask.redirect(content_redirect_url, 302)
except IOError as e:
Expand All @@ -91,6 +116,9 @@ def _get_image_layer(image_id, headers=None, bytes_range=None):
status = None
layer_size = 0

gopath = gconnect.layer(image_id)
path = json.loads(gopath)['gopath'] if gopath else path

if not store.exists(path):
raise exceptions.FileNotFoundError("Image layer absent from store")
try:
Expand Down Expand Up @@ -120,17 +148,22 @@ def _get_image_layer(image_id, headers=None, bytes_range=None):
def _get_image_json(image_id, headers=None):
if headers is None:
headers = {}
data = store.get_content(store.image_json_path(image_id))
try:
size = store.get_size(store.image_layer_path(image_id))
headers['X-Docker-Size'] = str(size)
except exceptions.FileNotFoundError:
pass
try:
csums = load_checksums(image_id)
headers['X-Docker-Checksum-Payload'] = csums
except exceptions.FileNotFoundError:
pass
# golang
data = gconnect.layer(image_id)
if data:
data = json.dumps(json.loads(data)['legacy'])
else:
data = store.get_content(store.image_json_path(image_id))
try:
size = store.get_size(store.image_layer_path(image_id))
headers['X-Docker-Size'] = str(size)
except exceptions.FileNotFoundError:
pass
try:
csums = load_checksums(image_id)
headers['X-Docker-Payload-Checksum'] = csums
except exceptions.FileNotFoundError:
pass
return toolkit.response(data, headers=headers, raw=True)


Expand Down Expand Up @@ -171,6 +204,7 @@ def _valid_bytes_range(bytes_range):

@app.route('/v1/images/<image_id>/layer', methods=['GET'])
@toolkit.requires_auth
@valid_image_id
@require_completion
@toolkit.valid_image_id
@set_cache_headers
Expand All @@ -183,8 +217,12 @@ def get_image_layer(image_id, headers):
bytes_range = _parse_bytes_range()
repository = toolkit.get_repository()
if repository and store.is_private(*repository):
if not toolkit.validate_parent_access(image_id):
return toolkit.api_error('Image not found', 404)
# This is not ideal
ongolang = gconnect.layer(image_id)
# Private, no v2 spoof, must check
if not ongolang:
if not toolkit.validate_parent_access(image_id):
return toolkit.api_error('Image not found', 404)
# If no auth token found, either standalone registry or privileged
# access. In both cases, access is always "public".
return _get_image_layer(image_id, headers, bytes_range)
Expand Down Expand Up @@ -267,8 +305,12 @@ def get_image_json(image_id, headers):
try:
repository = toolkit.get_repository()
if repository and store.is_private(*repository):
if not toolkit.validate_parent_access(image_id):
return toolkit.api_error('Image not found', 404)
ongolang = gconnect.layer(image_id)
# Private, no v2 spoof, must check
# This is not ideal
if not ongolang:
if not toolkit.validate_parent_access(image_id):
return toolkit.api_error('Image not found', 404)
# If no auth token found, either standalone registry or privileged
# access. In both cases, access is always "public".
return _get_image_json(image_id, headers)
Expand All @@ -283,12 +325,17 @@ def get_image_json(image_id, headers):
@set_cache_headers
@mirroring.source_lookup(cache=True, stream=False)
def get_image_ancestry(image_id, headers):
ancestry_path = store.image_ancestry_path(image_id)
try:
# Note(dmp): unicode patch
data = store.get_json(ancestry_path)
except exceptions.FileNotFoundError:
return toolkit.api_error('Image not found', 404)
# golang
data = gconnect.layer(image_id)
if data:
data = json.loads(data)['ancestry']
else:
ancestry_path = store.image_ancestry_path(image_id)
try:
# Note(dmp): unicode patch
data = store.get_json(ancestry_path)
except exceptions.FileNotFoundError:
return toolkit.api_error('Image not found', 404)
return toolkit.response(data, headers=headers)


Expand Down Expand Up @@ -329,6 +376,7 @@ def load_checksums(image_id):


@app.route('/v1/images/<image_id>/json', methods=['PUT'])
@valid_image_id
@toolkit.requires_auth
@toolkit.valid_image_id
def put_image_json(image_id):
Expand Down
132 changes: 132 additions & 0 deletions docker_registry/lib/golangconnector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# -*- coding: utf-8 -*-

import functools
import json
import logging
import time
from docker_registry.core import lru

logger = logging.getLogger(__name__)


def timeit(f):
@functools.wraps(f)
def wrapper(*args):
start = time.time()
content = f(*args)
end = time.time()
logger.info('Done in %s' % (end - start))
return content
return wrapper


class Connector(object):

def __init__(self, driver):
logger.debug('Entering gconnector')
self.driver = driver
self._imageshard = 'g2ci_4'
self._layershard = 'g2cl_3'

def _blobpath(self, digest):
digest = digest.split(':')
return '/registry-v2/docker/registry/v2/blobs/%s/%s/%s/data' % (
digest[0], digest[1][:2], digest[1]
)

def _manifestbase(self, user, name):
return '/registry-v2/docker/registry/v2/repositories/%s/%s/_manifests/tags' % (user, name)

def _manifestpath(self, user, name, tag):
return '%s/%s/current/link' % (self._manifestbase(user, name), tag)

@timeit
@lru.get
def _lookup_layer(self, shard):
logger.info('No go layer with that id in the cache: %s' % shard)
# print(lru.redis_conn.get(shard))
return None

# Getting a golang image by digest
# - look it up in the lru, return a raw manifest blob content if it's there
# (one lru call)
# - if it's not there, means we need to go over each layer so:
# * look up the digest on the golang blob store
# * process every layer and create lru entries {gopath: , legacy: } to lru
# (one s3 read, n+1 lru write)
@timeit
# @lru.get
def image_by_digest(self, key):
logger.info('Looking up image by digest %s' % key)
digest = key.split('/')[1]
storepath = self._blobpath(digest)
# jcontent = self.driver.get_json(storepath)
jcontent = json.loads(self.get_no_cache(storepath))
stack = []
for idx, legacy in enumerate(jcontent['history']):
stack.append(json.loads(legacy['v1Compatibility'])['id'])

for idx, goid in enumerate(jcontent['fsLayers']):
logger.info('Registering layer %s' % goid['blobSum'])
phypath = self._blobpath(goid['blobSum'])
phycontent = json.loads(jcontent['history'][idx]['v1Compatibility'])
# phycontent['id'] =
tolru = {
"gopath": phypath,
"legacy": phycontent,
"ancestry": stack[idx:]
}
logger.info('With phyid: %s' % phycontent['id'])

lru.redis_conn.set(
lru.cache_key('%s/%s' % (self._layershard, phycontent['id'])),
json.dumps(tolru))
# print('%s/%s' % (self._layershard, phycontent['id']))
# print(lru.redis_conn.get('%s/%s' % (self._layershard, phycontent['id'])))
return stack[0]
# json.dumps(jcontent)

def get_no_cache(self, path):
path = self.driver._init_path(path)
if hasattr(self.driver, 'makeKey'):
key = self.driver.makeKey(path)
if not key.exists():
raise Exception('%s is not there' % path)
return key.get_contents_as_string()
else:
return self.driver.get_content(path)

# Looking up a specific tag, no caching
# - if it's not there, fail and move on (one read on s3)
# - if it's there, move to the digest part of it
# (one s3 read, on lru write, one lru delete)
def image(self, user, name, tag):
mainkey = self._manifestpath(user, name, tag)
logger.info('Looking up go image: %s' % (mainkey))
try:
# Access the main manifest entry point
content = self.get_no_cache(mainkey)
logger.info('Go image is here')
# Don't store this one
# lru.redis_conn.delete(mainkey)
# Return the digest version
return self.image_by_digest('%s/%s' % (self._imageshard, content))
except Exception as e:
logger.info('No go image, or something wrong %s' % e)

def delete(self, user, name, tag):
self.driver.remove(self._manifestpath(user, name, tag))

# Getting layer infos from golang cache:
# - just look it up in the lru, return a {gopath: , legacy: } object
# - if it's not there, it's not there
def layer(self, id):
return self._lookup_layer('%s/%s' % (self._layershard, id))

def tags(self, user, name):
alltags = self.driver.list_directory(self._manifestbase(user, name))
for i in alltags:
shorttag = i.split('/').pop()
oldid = self.image(user, name, shorttag)
if oldid:
yield (shorttag, oldid)
Loading