Fixes some python style issues.

No changes in binders functionality are included in this commit, just improved
python style
This commit is contained in:
Daniel Roschka 2015-06-13 12:05:42 +02:00
parent ec85c465d8
commit 20d785840b
12 changed files with 294 additions and 250 deletions

View File

@ -4,6 +4,7 @@ from models import BindServer, Key
from django.contrib import admin
from django.forms import ModelForm, ValidationError
class BindServerAdminForm(ModelForm):
def clean_statistics_port(self):
port = self.cleaned_data["statistics_port"]
@ -13,7 +14,6 @@ class BindServerAdminForm(ModelForm):
params={'port': port})
return self.cleaned_data["statistics_port"]
def clean_dns_port(self):
port = self.cleaned_data["dns_port"]
if port < 1 or port > 65535:
@ -31,7 +31,7 @@ class BindServerAdmin(admin.ModelAdmin):
class KeyAdminForm(ModelForm):
def clean_data(self):
try:
keyring = dns.tsigkeyring.from_text({'': self.cleaned_data["data"]})
dns.tsigkeyring.from_text({'': self.cleaned_data["data"]})
except binascii.Error as err:
raise ValidationError("Invalid key data: %(error)s",
params={'error': err})

View File

@ -1,32 +1,34 @@
### Binder Exceptions
# Binder Exceptions
class TransferException(Exception):
"""
Thrown when an AXFR transfer cannot be performed.
"""
"""Thrown when an AXFR transfer cannot be performed."""
pass
class ZoneException(Exception):
"""
Thrown when there is an issue dealing with a
DNS zone.
"""
"""Thrown when there is an issue dealing with a DNS zone."""
pass
class RecordException(Exception):
"""
Thrown when there is an issue dealign with
a Record.
"""Thrown when there is an issue dealign with a Record.
* Adding or deleting.
"""
pass
class KeyringException(Exception):
"""
Thrown when there is a problem creating the keyring.
"""Thrown when there is a problem creating the keyring.
* When the length/padding of the TSIG data is incorrect.
"""

View File

@ -1,4 +1,4 @@
### Binder Forms
# Binder Forms
# 3rd Party
from django import forms
@ -8,10 +8,11 @@ from django.forms import ValidationError
# App Imports
from models import Key
### Custom Form Fields
class CustomUnicodeListField(forms.CharField):
"""Convert unicode item list to list of strings."""
def clean(self, value):
try:
string_list = [str(cur_rr) for cur_rr in eval(value)]
@ -19,12 +20,17 @@ class CustomUnicodeListField(forms.CharField):
raise ValidationError("Error in converting Unicode list to list of Strings: %r" % value)
return string_list
class CustomStringPeriodSuffix(forms.CharField):
""" Convert unicode to string and make sure period is last character. """
### This seems very unclean. Need a better to way to complete the fqdn
### depending on if it ends in a period.
### TODO(jforman): Add Regex check in here for valid rr data
### http://www.zytrax.com/books/dns/apa/names.html
"""Convert unicode to string and make sure period is last character.
This seems very unclean. Need a better to way to complete the fqdn
depending on if it ends in a period.
TODO(jforman): Add Regex check in here for valid rr data
http://www.zytrax.com/books/dns/apa/names.html
"""
def clean(self, value):
try:
new_string = str(value)
@ -34,42 +40,60 @@ class CustomStringPeriodSuffix(forms.CharField):
raise ValidationError("Unable to stick a period on the end of your input: %r" % value)
return new_string
### Form Models
class FormAddForwardRecord(forms.Form):
"""Form used to add a Forward DNS record."""
dns_server = forms.CharField(max_length=100)
record_name = forms.RegexField(max_length=100, regex="^[a-zA-Z0-9-_]+$", required=False)
record_name = forms.RegexField(max_length=100,
regex="^[a-zA-Z0-9-_]+$",
required=False)
record_type = forms.ChoiceField(choices=settings.RECORD_TYPE_CHOICES)
zone_name = forms.CharField(max_length=100)
record_data = forms.GenericIPAddressField()
ttl = forms.ChoiceField(choices=settings.TTL_CHOICES)
create_reverse = forms.BooleanField(required=False)
key_name = forms.ModelChoiceField(queryset=Key.objects.all(), required=False)
key_name = forms.ModelChoiceField(queryset=Key.objects.all(),
required=False)
class FormAddReverseRecord(forms.Form):
"""Form used to add a Reverse (PTR) DNS record."""
dns_server = forms.CharField(max_length=100)
record_name = forms.IntegerField(min_value=0, max_value=255)
record_type = forms.RegexField(regex=r"^PTR$",error_messages={"invalid" : "The only valid choice here is PTR."})
record_type = forms.RegexField(regex=r"^PTR$",
error_messages={"invalid": "The only valid choice here is PTR."})
zone_name = forms.CharField(max_length=100)
record_data = CustomStringPeriodSuffix(required=True)
ttl = forms.ChoiceField(choices=settings.TTL_CHOICES)
key_name = forms.ModelChoiceField(queryset=Key.objects.all(), required=False)
key_name = forms.ModelChoiceField(queryset=Key.objects.all(),
required=False)
create_reverse = forms.BooleanField(required=False)
class FormAddCnameRecord(forms.Form):
"""Form used to add a CNAME record."""
dns_server = forms.CharField(max_length=100)
originating_record = forms.CharField(max_length=100)
cname = forms.RegexField(max_length=100, regex="^[a-zA-Z0-9-_]+$")
zone_name = forms.CharField(max_length=256)
ttl = forms.ChoiceField(choices=settings.TTL_CHOICES)
key_name = forms.ModelChoiceField(queryset=Key.objects.all(), required=False)
key_name = forms.ModelChoiceField(queryset=Key.objects.all(),
required=False)
class FormDeleteRecord(forms.Form):
"""Final form to delete DNS record(s)."""
dns_server = forms.CharField(max_length=100)
zone_name = forms.CharField(max_length=256)
rr_list = CustomUnicodeListField()
key_name = forms.ModelChoiceField(queryset=Key.objects.all(), required=False)
key_name = forms.ModelChoiceField(queryset=Key.objects.all(),
required=False)

View File

@ -1,10 +1,8 @@
### Binder Helpers
# Binder Helpers
# Standard Imports
import binascii
import re
import socket
import sys
# 3rd Party
import dns.query
@ -14,9 +12,11 @@ import dns.tsigkeyring
import dns.update
# App Imports
from binder import exceptions, models
from binder import models
def add_record(dns_server, zone_name, record_name, record_type, record_data, ttl, key_name, create_reverse=False):
def add_record(dns_server, zone_name, record_name, record_type, record_data,
ttl, key_name, create_reverse=False):
"""Parse passed elements and determine which records to create.
Args:
@ -32,9 +32,9 @@ def add_record(dns_server, zone_name, record_name, record_type, record_data, ttl
Return:
Dict containing {description, output} from record creation
"""
response = []
response.append({ "description" : "Forward Record Creation: %s.%s" % (record_name, zone_name),
response.append({"description": "Forward Record Creation: %s.%s" %
(record_name, zone_name),
"output": create_update(dns_server,
zone_name,
record_name,
@ -44,7 +44,9 @@ def add_record(dns_server, zone_name, record_name, record_type, record_data, ttl
key_name)})
"""If requested, create a reverse PTR record.
Given the forward record created, resolve its underlying IP. Use that to create the reverse record.
Given the forward record created, resolve its underlying IP.
Use that to create the reverse record.
reverse_ip_fqdn ex: 5.0.20.10.in-addr.arpa.
reverse_ip: 5
reverse_domain: 0.20.10.in-addr.arpa.
@ -66,9 +68,10 @@ def add_record(dns_server, zone_name, record_name, record_type, record_data, ttl
return response
def add_cname_record(dns_server, zone_name, cname, originating_record, ttl, key_name):
"""Add a Cname record."""
def add_cname_record(dns_server, zone_name, cname, originating_record, ttl,
key_name):
"""Add a CNAME record."""
output = create_update(dns_server,
zone_name,
cname,
@ -77,12 +80,13 @@ def add_cname_record(dns_server, zone_name, cname, originating_record, ttl, key_
ttl,
key_name)
return [{ "description" : "CNAME %s.%s points to %s" % (cname, zone_name, originating_record),
return [{"description": "CNAME %s.%s points to %s" %
(cname, zone_name, originating_record),
"output": output}]
def delete_record(dns_server, rr_list, key_name):
"""Delete a list of DNS records passed as strings in rr_items."""
server = models.BindServer.objects.get(hostname=dns_server)
try:
@ -99,18 +103,24 @@ def delete_record(dns_server, rr_list, key_name):
record_list = current_rr.split(".", 1)
record = record_list[0]
domain = record_list[1]
dns_update = dns.update.Update(domain, keyring=keyring, keyalgorithm=algorithm)
dns_update = dns.update.Update(domain,
keyring=keyring,
keyalgorithm=algorithm)
dns_update.delete(record)
output = send_dns_update(dns_update, dns_server, server.dns_port, key_name)
output = send_dns_update(dns_update,
dns_server,
server.dns_port,
key_name)
delete_response.append({"description": "Delete Record: %s" % current_rr,
"output": output})
return delete_response
def create_update(dns_server, zone_name, record_name, record_type, record_data, ttl, key_name):
""" Update/Create DNS record of name and type with passed data and ttl. """
def create_update(dns_server, zone_name, record_name, record_type, record_data,
ttl, key_name):
"""Update/Create DNS record of name and type with passed data and ttl."""
server = models.BindServer.objects.get(hostname=dns_server)
try:
@ -122,14 +132,18 @@ def create_update(dns_server, zone_name, record_name, record_type, record_data,
keyring = transfer_key.create_keyring()
algorithm = transfer_key.algorithm
dns_update = dns.update.Update(zone_name, keyring=keyring, keyalgorithm=algorithm)
dns_update = dns.update.Update(zone_name,
keyring=keyring,
keyalgorithm=algorithm)
dns_update.replace(record_name, ttl, record_type, record_data)
output = send_dns_update(dns_update, dns_server, server.dns_port, key_name)
return output
def ip_info(host_name):
"""Create a dictionary mapping address types to their IP's.
If an error is encountered, key to error is "Error".
"""
info = []
@ -148,6 +162,7 @@ def ip_info(host_name):
return info
def send_dns_update(dns_message, dns_server, port, key_name):
"""Send DNS message to server and return response.
@ -159,14 +174,13 @@ def send_dns_update(dns_message, dns_server, port, key_name):
Returns:
String output
"""
try:
output = dns.query.tcp(dns_message, dns_server, port=port)
except dns.tsig.PeerBadKey:
output = ("DNS server %s is not configured for TSIG key: %s." %
(dns_server, key_name))
except dns.tsig.PeerBadSignature:
output = ("DNS server %s did like the TSIG signature we sent. Check key %s "
"for correctness." % (dns_server, key_name))
output = ("DNS server %s did like the TSIG signature we sent. Check "
"key %s for correctness." % (dns_server, key_name))
return output

View File

@ -3,7 +3,6 @@
# Standard Imports
import binascii
import socket
import urllib2
# 3rd Party
from pybindxml import reader as bindreader
@ -22,20 +21,24 @@ TSIG_ALGORITHMS = (('HMAC-MD5.SIG-ALG.REG.INT', 'MD5'),
('hmac-sha384', 'SHA384'),
('hmac-sha512', 'SHA512'))
class Key(models.Model):
"""Store and reference TSIG keys.
TODO: Should/Can we encrypt these DNS keys in the DB?
"""
name = models.CharField(max_length=255,
unique=True,
help_text="A human readable name for the key to store, used for "
"further references to the key.")
help_text="A human readable name for the key to "
"store, used for further references to the key.")
data = models.CharField(max_length=255,
help_text="The private part of the TSIG key.")
algorithm = models.CharField(max_length=255,
choices=TSIG_ALGORITHMS,
help_text="The algorithm which has been used for the key.")
help_text="The algorithm which has been used "
"for the key.")
def __unicode__(self):
return self.name
@ -56,10 +59,12 @@ class Key(models.Model):
class BindServer(models.Model):
""" Store DNS servers and attributes for referencing their
statistics ports. Also reference FK for TSIG transfer keys,
if required.
"""Store DNS servers and attributes for referencing their statistics ports.
Also reference FK for TSIG transfer keys, if required.
"""
hostname = models.CharField(max_length=255,
unique=True,
help_text="Host name or IP address of the BIND server.")
@ -110,7 +115,6 @@ class BindServer(models.Model):
Returns:
List of Dicts { String rr_name, String rr_ttl, String rr_class, String rr_type, String rr_data }
"""
try:
transfer_key = Key.objects.get(name=self.default_transfer_key)
except Key.DoesNotExist:
@ -154,4 +158,3 @@ class BindServer(models.Model):
record_array.append(rr_dict)
return record_array

View File

@ -2,7 +2,8 @@ from django.core.urlresolvers import reverse
from django.test import TestCase
from django.test.client import Client
from binder import helpers, models
from binder import models
class Integration_Tests(TestCase):
fixtures = ["binder/fixtures/binder_test.json"]
@ -34,8 +35,7 @@ class Integration_Tests(TestCase):
"""Delete record1.domain1.local"""
delete_dict = {"dns_server": self.testserver,
"zone_name": "domain1.local",
"rr_list" : '[u"record1.domain1.local", u"record2.domain1.local"]',
}
"rr_list": '[u"record1.domain1.local", u"record2.domain1.local"]'}
response = self.client.post(reverse("delete_record_result"), delete_dict)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.context["response"]), 2)
@ -44,7 +44,6 @@ class Integration_Tests(TestCase):
self.assertRegexpMatches(dns_update_output, "opcode UPDATE")
self.assertRegexpMatches(dns_update_output, "rcode NOERROR")
def test_Integration_Add_Cname(self):
"""Add CNAME cnametest1 after adding associated A record record1."""
add_dict = {"dns_server": self.testserver,
@ -68,8 +67,7 @@ class Integration_Tests(TestCase):
"originating_record": "record1.domain1.local",
"cname": "cnametest1",
"zone_name": "domain1.local",
"ttl" : 86400,
}
"ttl": 86400}
response = self.client.post(reverse("add_cname_result"), cname_dict)
self.assertEqual(response.status_code, 200)
for current_response in response.context["response"]:
@ -77,7 +75,6 @@ class Integration_Tests(TestCase):
self.assertRegexpMatches(dns_update_output, "opcode UPDATE")
self.assertRegexpMatches(dns_update_output, "rcode NOERROR")
def test_Integration_ServerZoneList_ConnectionRefused(self):
"""Confirm connection refused on a server zone list."""
dns_server = models.BindServer.objects.get(hostname="testserver1")
@ -94,11 +91,14 @@ class Integration_Tests(TestCase):
def test_Integration_ZoneList_MissingTransferKey(self):
"""Attempt to list a zone's records with missing TSIG key.
domain3.local should be configured to require a TSIG key
for transfers."""
for transfers.
"""
dns_server = models.BindServer.objects.get(hostname="testserver1")
response = self.client.get(reverse("zone_list", args=("testserver1", "domain3.local")))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.context["zone_name"], "domain3.local")
self.assertEqual(response.context["dns_server"], "testserver1")
self.assertRegexpMatches(str(response.context["errors"]), "Unable to perform AXFR to list zone records. Did you forget to specify a default transfer key?")
self.assertRegexpMatches(str(response.context["errors"]),
"Unable to perform AXFR to list zone records. Did you forget to specify a default transfer key?")

View File

@ -2,6 +2,7 @@ from django.test import TestCase
from binder import forms
class Form_Tests(TestCase):
def test_Valid_FormAddRecord(self):
"""Test FormAddRecord with valid data, with/without create_reverse."""
@ -40,8 +41,6 @@ class Form_Tests(TestCase):
reverseform_1.is_valid()
self.assertTrue(reverseform_1.is_valid())
def test_MissingData_FormAddRecord(self):
"""Submit FormAddRecord with missing record_data."""
form_data = {"dns_server": "server1",
@ -83,8 +82,7 @@ class Form_Tests(TestCase):
"""Validate good data in the FormDeleteRecord form."""
delete_dict = {"dns_server": "foo.net",
"zone_name": "domain1.local",
"rr_list" : '[u"record1.domain1.local", u"record2.domain1.local"]',
}
"rr_list": '[u"record1.domain1.local", u"record2.domain1.local"]'}
testform_1 = forms.FormDeleteRecord(delete_dict)
testform_1.is_valid
self.assertFalse(testform_1.errors)

View File

@ -3,6 +3,7 @@ from django.db import IntegrityError
from binder import models
class Model_BindServer_Tests(TestCase):
def test_BindServerModel(self):
"""Test that adding a well-formed BindServer works."""
@ -38,4 +39,4 @@ class Model_Key_Tests(TestCase):
def test_NonExistantKey(self):
with self.assertRaisesMessage(models.Key.DoesNotExist, "Key matching query does not exist"):
this_key = models.Key.objects.get(name="does_not_exist")
models.Key.objects.get(name="does_not_exist")

View File

@ -2,11 +2,13 @@ from django.test import TestCase
from django.test.client import Client
from django.core.urlresolvers import reverse
from binder import models, helpers
from binder import models
class GetTests(TestCase):
"""Unit Tests that exercise HTTP GET."""
def setUp(self):
self.client = Client()
@ -33,21 +35,24 @@ class GetTests(TestCase):
def test_GetInvalidServer(self):
"""Get a zone list for a server not in the database."""
server_name = "unconfigured.server.net"
response = self.client.get(reverse("server_zone_list", args=(server_name, )))
response = self.client.get(reverse("server_zone_list",
args=(server_name, )))
self.assertEqual(response.status_code, 404)
class PostTests(TestCase):
"""Unit Tests that exercise HTTP POST."""
def setUp(self):
self.client = Client()
models.BindServer(hostname="testserver.test.net",
statistics_port=1234).save()
def test_DeleteRecordInitial_Empty(self):
"""Ensure the initial deletion form works as expected with no RR list."""
response = self.client.post(reverse("delete_record"), { "dns_server" : "testserver.test.net",
response = self.client.post(reverse("delete_record"),
{"dns_server": "testserver.test.net",
"zone_name": "testzone1.test.net",
"rr_list": []})

View File

@ -1,6 +1,4 @@
from django.conf.urls import patterns, include, url
from django.core.urlresolvers import reverse
from django.contrib import admin
admin.autodiscover()

View File

@ -1,4 +1,4 @@
### Binder VIews
# Binder VIews
# 3rd Party
from django.conf import settings
@ -7,6 +7,7 @@ from django.shortcuts import get_object_or_404, redirect, render
# App Imports
from binder import exceptions, forms, helpers, models
def home_index(request):
"""List the main index page for Binder."""
return render(request, "index.html")
@ -17,7 +18,8 @@ def view_server_list(request):
server_list = models.BindServer.objects.all().order_by("hostname")
server_info = []
for current in server_list:
server_info.append({"host_name" : current, "ip_address" : helpers.ip_info(current.hostname)})
server_info.append({"host_name": current,
"ip_address": helpers.ip_info(current.hostname)})
return render(request, "bcommon/list_servers.html",
{"server_info": server_info})
@ -72,8 +74,7 @@ def view_add_record(request, dns_server, zone_name):
"zone_name": zone_name,
"tsig_keys": models.Key.objects.all(),
"ttl_choices": settings.TTL_CHOICES,
"record_type_choices": settings.RECORD_TYPE_CHOICES,
})
"record_type_choices": settings.RECORD_TYPE_CHOICES})
def view_add_record_result(request):
@ -125,7 +126,6 @@ def view_add_record_result(request):
def view_add_cname_record(request, dns_server, zone_name, record_name):
"""Process given input to add a CNAME pointer."""
this_server = get_object_or_404(models.BindServer, hostname=dns_server)
return render(request, "bcommon/add_cname_record_form.html",

View File

@ -8,7 +8,6 @@ https://docs.djangoproject.com/en/1.7/howto/deployment/wsgi/
"""
import os
import sys
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "binder.settings")
from django.core.wsgi import get_wsgi_application