# -*- coding: utf-8 -*- from flask import Flask, request, flash, render_template, redirect, Response, url_for import docker import re import unidecode import datetime import base64 # # from flask_wtf import FlaskForm # from wtforms import StringField # from wtforms.validators import DataRequired clients = {'Docker local': docker.from_env(), 'Docker 132': docker.DockerClient(base_url="tcp://10.66.4.132:2375")} client = docker.from_env() app = Flask(__name__) # # class RenameContainer(FlaskForm): # name = StringField('name', validators=[DataRequired()]) # SECRET_KEY = b'secret' @app.route("/") def index(): data = [] for client, value in clients.items(): try: data.append({'info': value.info(), 'version': value.version(), 'client_name': client, 'volumes': len(value.volumes.list()), 'networks': len(value.networks.list()), 'df': value.df()}) except: pass return render_template('index.html', data=data) # # IMAGES # @app.route('//images/dangling') def images_dangling_action(client_name): if client_name in clients: images = clients[client_name].images.list(filters={'dangling': True}) return render_template('images.html', images=images, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//images/all') def images_all_action(client_name): if client_name in clients: images = clients[client_name].images.list(all=True) return render_template('images.html', images=images, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//images/id/') def image_action(client_name, image_id): if client_name in clients: image = clients[client_name].images.get(image_id) return render_template('image.html', image=image, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//images') def images_action(client_name): if client_name in clients: images = clients[client_name].images.list() return render_template('images.html', images=images, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//images/export/') def export_image_action(client_name, image_tag=None): if client_name in clients: image = clients[client_name].images.get(decode64(image_tag)) return Response(image.save().stream(), mimetype='application/tar', headers={"Content-disposition": "attachment; filename=%s_%s_%s.tar" % ( slugify(client_name), datetime.date.today().strftime("%Y-%m-%d"), slugify(image.attrs['RepoTags'][0]) )}) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//images/remove/') def remove_image_action(client_name, image_tag=None): if client_name in clients: images = clients[client_name].images image = images.get(decode64(image_tag)) flash('Image \'%s\' removed' % image.attrs['RepoTags'][0]) images.remove(decode64(image_tag)) return redirect(request.referrer) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//images/remove_by_id/') def remove_imageby_id_action(client_name, image_id=None): if client_name in clients: flash('Image \'%s\' removed' % image_id) clients[client_name].images.remove(image_id) return redirect(request.referrer) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) # # CONTAINERS # @app.route('//rename_container', methods=['POST']) def rename_container(client_name): if client_name in clients: if request.form['name'] and request.form['short_id']: container = clients[client_name].containers.get(request.form['short_id']) container.rename(request.form['name'].strip()) flash('Renamed container') return redirect(url_for('containers_action')) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/') def containers_compose_action(client_name, compose_project): if client_name in clients: containers = clients[client_name].containers.list( filters={'label': "com.docker.compose.project=" + compose_project}) containers.sort(key=lambda x: x.name) containers.sort(key=lambda x: x.attrs['Config']['Labels'].get('com.docker.compose.project', '')) return render_template('containers.html', containers=containers, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers') def containers_action(client_name): if client_name in clients: containers = clients[client_name].containers.list() containers.sort(key=lambda x: x.name) containers.sort(key=lambda x: x.attrs['Config']['Labels'].get('com.docker.compose.project', '')) return render_template('containers.html', containers=containers, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/all') def containers_all_action(client_name): if client_name in clients: containers = clients[client_name].containers.list(all=True) # sortowanie po nazwie a potem po docker compose project name containers.sort(key=lambda x: x.name) containers.sort(key=lambda x: x.attrs['Config']['Labels'].get('com.docker.compose.project', '')) return render_template('containers.html', containers=containers, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/id/') def container_action(client_name, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) return render_template('container.html', container=container, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/log/', defaults={"timestamp": 0}) @app.route('//containers/log//') def log_action(client_name, timestamp, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) logs = container.logs(timestamps=timestamp > 0).replace(b'\n', bytes("
", 'utf-8')) return render_template('log.html', container=container, logs=logs.decode(), client_name=client_name, short_id=short_id) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/top/') def top_action(client_name, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) top = container.top() return render_template('top.html', container=container, top=top, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/export/') def export_container_action(client_name, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) return Response(container.export().stream(), mimetype='application/tar', headers={"Content-disposition": "attachment; filename=%s_%s_%s_%s.tar" % ( slugify(client_name), datetime.date.today().strftime("%Y-%m-%d"), container.short_id, slugify(container.name) )}) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/remove/') def remove_container_action(client_name, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) container.remove(v=True) flash('Removed container \'%s\' ' % container.name) return redirect(request.referrer) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/stop/') def stop_container_action(client_name, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) container.stop() flash('Stopped container \'%s\' ' % container.name) return redirect(request.referrer) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/start/') def start_container_action(client_name, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) container.start() flash('Started container \'%s\' ' % container.name) return redirect(request.referrer) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/restart/') def restart_container_action(client_name, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) container.restart() flash('Restarted container \'%s\' ' % container.name) return redirect(request.referrer) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//containers/') def containers_status_action(client_name, status=None): if client_name in clients: containers = clients[client_name].containers.list(filters={'status': status}) return render_template('containers.html', containers=containers, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) # # VOLUMES # @app.route('//volumes') def volumes_action(client_name): if client_name in clients: volumes = clients[client_name].volumes.list() return render_template('volumes.html', volumes=volumes, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//volumes/prune') def volumes_prune_action(client_name): if client_name in clients: prune_data = clients[client_name].volumes.prune() if prune_data['VolumesDeleted'] is not None: prune_text = 'Deleted %d volumes: and reclaimed %s space' % ( len(prune_data['VolumesDeleted']), file_size(prune_data['SpaceReclaimed']) ) else: prune_text = "Deleted 0 volumes and reclaimed 0bytes." flash(prune_text) return redirect(url_for('volumes_action', client_name=client_name)) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) @app.route('//volumes/id/') def volume_action(client_name, short_id=None): if client_name in clients: container = clients[client_name].containers.get(short_id) return render_template('container.html', container=container, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) # # NETWORKS # @app.route('//networks') def networks_action(client_name): if client_name in clients: volumes = clients[client_name].networks.list() return render_template('networks.html', networks=volumes, client_name=client_name) else: flash('Client name \'%s\' not found' % client_name) return redirect(url_for('index')) # # TEMPLATE # @app.template_filter('file_size') def file_size(num): suffix = 'B' for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: if abs(num) < 1024.0: return "%3.1f %s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s %s" % (num, 'Yi', suffix) @app.template_filter('encode64') def encode64(data): return base64.b64encode(str.encode(data)) @app.template_filter('decode64') def decode64(data): return base64.b64decode(data).decode() def slugify(text): text = unidecode.unidecode(text).lower() return re.sub(r'\W+', '_', text) app.secret_key = b'\xd7:o\\\xaayFe\x1ey\x08m9\xe4\xbc!\xee\x0e>\xd1Z\x99-\xbb' #http://flask.pocoo.org/snippets/133/ @TODO if __name__ == "__main__": app.run(host="0.0.0.0", port=5001, debug=True, threaded=True)