Skip to content

Gufo ACME Examples: Signing Certificate

We have mastered how to create an ACME server account in our acme_register example. Now it is time to bring all pieces together and get a signed certificate for our domain.

The ACME protocol requires that the client will prove the ownership of the domain by one of available means, called challenge. In out example we will use http-01 type of challenge which requires the client will obtain a token from ACME server and made it available via well-known URL like http://<domain>/.well-known/acme-challenges/<token>.

We consider you have a Nginx server set up and running with config like this:

/etc/nginx/conf.d/
server {
  listen 80;
  server_name <domain>;

  location /.well-known/acme-challenge/ {
    alias /www/acme/;
  }
}

So our task is:

  1. Get a token for domain.
  2. Place file into /www/acme/ directory.
  3. Ask the server to perform validation.
  4. Grab the certificate.

The ACME protocol is quite complex and really require much more stages, but, luckily, Gufo ACME hides all the complexity and provides clean API.

acme_sign.py
import asyncio
import os
import sys

from gufo.acme.clients.base import AcmeClient
from gufo.acme.types import AcmeChallenge

CHALLENGE_DIR = "/www/acme/"


class SignAcmeClient(AcmeClient):
    async def fulfill_http_01(
        self, domain: str, challenge: AcmeChallenge
    ) -> bool:
        v = self.get_key_authorization(challenge)
        with open(os.path.join(CHALLENGE_DIR, challenge.token), "wb") as fp:
            fp.write(v)
        return True

    async def clear_http_01(
        self: AcmeClient, domain: str, challenge: AcmeChallenge
    ) -> None:
        os.unlink(os.path.join(CHALLENGE_DIR, challenge.token))


async def main(
    client_state_path: str, domain: str, csr_path: str, cert_path: str
) -> None:
    with open(client_state_path, "wb") as fp:
        state = fp.read()
    with open(csr_path, "wb") as fp:
        csr = fp.read()
    async with SignAcmeClient.from_state(state) as client:
        cert = await client.sign(domain, csr)
    with open(cert_path, "wb") as fp:
        fp.write(cert)


if __name__ == "__main__":
    asyncio.run(main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]))
The code is straightforward:

acme_sign.py
1
2
3
4
5
6
import asyncio
import os
import sys

from gufo.acme.clients.base import AcmeClient
from gufo.acme.types import AcmeChallenge

AcmeClient is an asynchronous client, so we need asyncio.run() function to launch it.

acme_sign.py
1
2
3
4
5
6
import asyncio
import os
import sys

from gufo.acme.clients.base import AcmeClient
from gufo.acme.types import AcmeChallenge

Import os module which is required to join paths.

acme_sign.py
1
2
3
4
5
6
import asyncio
import os
import sys

from gufo.acme.clients.base import AcmeClient
from gufo.acme.types import AcmeChallenge

Import sys module to parse the CLI argument.

Warning

We use sys.argv only for demonstration purposes. Use argsparse or alternatives in real-world applications.

acme_sign.py
1
2
3
4
5
6
import asyncio
import os
import sys

from gufo.acme.clients.base import AcmeClient
from gufo.acme.types import AcmeChallenge

Then we import an AcmeClient itself.

acme_sign.py
1
2
3
4
5
6
import asyncio
import os
import sys

from gufo.acme.clients.base import AcmeClient
from gufo.acme.types import AcmeChallenge
We also need an AcmeChallenge type.

acme_sign.py
from gufo.acme.types import AcmeChallenge

CHALLENGE_DIR = "/www/acme/"


class SignAcmeClient(AcmeClient):
The crucial ACME protocol concept is the Directory. The directory is an URL which allows to fetch all necessary information about ACME server. In our case we're using Letsencrypt staging directory.

Warning

The staging server should be used only for testing purposes. Replace the DIRECTORY variable with the productive endpoint to get the real certificates.

acme_sign.py
class SignAcmeClient(AcmeClient):
    async def fulfill_http_01(
        self, domain: str, challenge: AcmeChallenge
    ) -> bool:
        v = self.get_key_authorization(challenge)
        with open(os.path.join(CHALLENGE_DIR, challenge.token), "wb") as fp:
            fp.write(v)
        return True

    async def clear_http_01(
        self: AcmeClient, domain: str, challenge: AcmeChallenge
    ) -> None:
        os.unlink(os.path.join(CHALLENGE_DIR, challenge.token))
We need to provide the implementation of the challenge fulfillment. The Gufo ACME's API provides special methods which can be overriden in subclasses to implement the desired behavior. So we are creating a subclass of AcmeClient.

acme_sign.py
class SignAcmeClient(AcmeClient):
    async def fulfill_http_01(
        self, domain: str, challenge: AcmeChallenge
    ) -> bool:
        v = self.get_key_authorization(challenge)
        with open(os.path.join(CHALLENGE_DIR, challenge.token), "wb") as fp:
            fp.write(v)
        return True

    async def clear_http_01(
        self: AcmeClient, domain: str, challenge: AcmeChallenge
    ) -> None:
        os.unlink(os.path.join(CHALLENGE_DIR, challenge.token))
We're implementing http-01 challenge, so we need to override fulfill_http_01 method. This is an asyncronous method so we use any async function inside. It accepts two parameters:

  • domain - a domain name.
  • challenge - a AcmeChallenge structure, which has a token field, containing challenge token.

Function returns True when fulfillment has beed processed correctly, or False, if we wan't provide a fulfillment.

acme_sign.py
class SignAcmeClient(AcmeClient):
    async def fulfill_http_01(
        self, domain: str, challenge: AcmeChallenge
    ) -> bool:
        v = self.get_key_authorization(challenge)
        with open(os.path.join(CHALLENGE_DIR, challenge.token), "wb") as fp:
            fp.write(v)
        return True

    async def clear_http_01(
        self: AcmeClient, domain: str, challenge: AcmeChallenge
    ) -> None:
        os.unlink(os.path.join(CHALLENGE_DIR, challenge.token))
According the ACME protocol, we need to place a specially formed data to prove our authority. The data contain challenge token and the fingerprint of the client's key. The calculation may be tricky, but Gufo ACME provides a AcmeClient.get_key_authorization() method, which performs all necessary calculations. So we pass challenge parameter and grab an authorization data as value of the variable v.

acme_sign.py
class SignAcmeClient(AcmeClient):
    async def fulfill_http_01(
        self, domain: str, challenge: AcmeChallenge
    ) -> bool:
        v = self.get_key_authorization(challenge)
        with open(os.path.join(CHALLENGE_DIR, challenge.token), "wb") as fp:
            fp.write(v)
        return True

    async def clear_http_01(
        self: AcmeClient, domain: str, challenge: AcmeChallenge
    ) -> None:
        os.unlink(os.path.join(CHALLENGE_DIR, challenge.token))
We're building real file name by adding token's value to CHALLENGE_DIR. The autrorization key has type bytes so we open the file in wb mode.

acme_sign.py
class SignAcmeClient(AcmeClient):
    async def fulfill_http_01(
        self, domain: str, challenge: AcmeChallenge
    ) -> bool:
        v = self.get_key_authorization(challenge)
        with open(os.path.join(CHALLENGE_DIR, challenge.token), "wb") as fp:
            fp.write(v)
        return True

    async def clear_http_01(
        self: AcmeClient, domain: str, challenge: AcmeChallenge
    ) -> None:
        os.unlink(os.path.join(CHALLENGE_DIR, challenge.token))
Finally, we return True to sigalize, we have performed fulfillment and ready to start validation.

acme_sign.py
class SignAcmeClient(AcmeClient):
    async def fulfill_http_01(
        self, domain: str, challenge: AcmeChallenge
    ) -> bool:
        v = self.get_key_authorization(challenge)
        with open(os.path.join(CHALLENGE_DIR, challenge.token), "wb") as fp:
            fp.write(v)
        return True

    async def clear_http_01(
        self: AcmeClient, domain: str, challenge: AcmeChallenge
    ) -> None:
        os.unlink(os.path.join(CHALLENGE_DIR, challenge.token))
The ACME protocol definition exlicitly notes that client may clean up prepared data after the validation. AcmeClient allows to add own cleanup code by overriding cleanup_* methods. In our case we're overriding clear_http_01 method. Just like fulfill_http_01, it accepts two parameters:

  • domain - a domain name.
  • challenge - a AcmeChallenge structure, which has a token field, containing challenge token.

acme_sign.py
class SignAcmeClient(AcmeClient):
    async def fulfill_http_01(
        self, domain: str, challenge: AcmeChallenge
    ) -> bool:
        v = self.get_key_authorization(challenge)
        with open(os.path.join(CHALLENGE_DIR, challenge.token), "wb") as fp:
            fp.write(v)
        return True

    async def clear_http_01(
        self: AcmeClient, domain: str, challenge: AcmeChallenge
    ) -> None:
        os.unlink(os.path.join(CHALLENGE_DIR, challenge.token))
We're removing the file created in the fulfill_http_01 method.

acme_sign.py
async def main(
    client_state_path: str, domain: str, csr_path: str, cert_path: str
) -> None:
    with open(client_state_path, "wb") as fp:
        state = fp.read()
    with open(csr_path, "wb") as fp:
        csr = fp.read()
    async with SignAcmeClient.from_state(state) as client:
        cert = await client.sign(domain, csr)
    with open(cert_path, "wb") as fp:
        fp.write(cert)
We define the main function to wrap our code. It assepts the following parameters:

  • client_state_path - a path to a client's state we have created in our acme_register example.
  • domain_name - our domain name.
  • csr_path - a path to the CSR we have created in our get_csr example.
  • cert_path - a path to where we must write a resulting certificate.

Note

The main function is asynchronous

acme_sign.py
async def main(
    client_state_path: str, domain: str, csr_path: str, cert_path: str
) -> None:
    with open(client_state_path, "wb") as fp:
        state = fp.read()
    with open(csr_path, "wb") as fp:
        csr = fp.read()
    async with SignAcmeClient.from_state(state) as client:
        cert = await client.sign(domain, csr)
    with open(cert_path, "wb") as fp:
        fp.write(cert)
We're reading client state from client_state_path. The state is binary so we're opening the file in rb mode.

acme_sign.py
async def main(
    client_state_path: str, domain: str, csr_path: str, cert_path: str
) -> None:
    with open(client_state_path, "wb") as fp:
        state = fp.read()
    with open(csr_path, "wb") as fp:
        csr = fp.read()
    async with SignAcmeClient.from_state(state) as client:
        cert = await client.sign(domain, csr)
    with open(cert_path, "wb") as fp:
        fp.write(cert)
We're reading CSR from csr_path. The state is binary so we're opening the file in rb mode.

acme_sign.py
async def main(
    client_state_path: str, domain: str, csr_path: str, cert_path: str
) -> None:
    with open(client_state_path, "wb") as fp:
        state = fp.read()
    with open(csr_path, "wb") as fp:
        csr = fp.read()
    async with SignAcmeClient.from_state(state) as client:
        cert = await client.sign(domain, csr)
    with open(cert_path, "wb") as fp:
        fp.write(cert)
We're instantiating AcmeClient directly from state by using from_state() method. Note, we're restoring state not into the AcmeClient, but in our SignAcmeClient subclass. The new client instance loads the private key, directory, and account information directly from state.

acme_sign.py
async def main(
    client_state_path: str, domain: str, csr_path: str, cert_path: str
) -> None:
    with open(client_state_path, "wb") as fp:
        state = fp.read()
    with open(csr_path, "wb") as fp:
        csr = fp.read()
    async with SignAcmeClient.from_state(state) as client:
        cert = await client.sign(domain, csr)
    with open(cert_path, "wb") as fp:
        fp.write(cert)
And finally, we call an AcmeClient.sign method, which accepts domain name and CSR. The sign method simple hides all the protocol's complexity and simply returns us a signed certificate in PEM format.

acme_sign.py
async def main(
    client_state_path: str, domain: str, csr_path: str, cert_path: str
) -> None:
    with open(client_state_path, "wb") as fp:
        state = fp.read()
    with open(csr_path, "wb") as fp:
        csr = fp.read()
    async with SignAcmeClient.from_state(state) as client:
        cert = await client.sign(domain, csr)
    with open(cert_path, "wb") as fp:
        fp.write(cert)
We're writing a result into the output file. The certificate has type bytes and we're opening the file in wb mode.

acme_sign.py
if __name__ == "__main__":
    asyncio.run(main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]))
If we're called from command line, get a command line arguments:

  1. Client's state path
  2. Domain name
  3. CSR path
  4. Certificate path

Running

Run the example:

python3 examples/acme_sign.py /tmp/acme.json mydomain.com /tmp/csr.pem /tmp/cert.pem

Check the /tmp/cert.pem file:

/tmp/cert.pem
-----BEGIN CERTIFICATE-----
MIIFZTCCA00CFGMFCNLNSJLkcnJn4XJUGhtHh5JXMA0GCSqGSIb3DQEBCwUAMG8x
CzAJBgNVBAYTAklUMQ8wDQYDVQQIDAZNaWxhbm8xDzANBgNVBAcMBk1pbGFubzES
...
+pqFSNi9tsBy/T9zdVa4giUW68Zc3ezN+t+bvD/qNvAsH+c2ajR8utK0ehv+FpGH
nOfZOASlIEp2te2A6bhHqUqh7LIydzg4YV7FSnfoabO2wDbnHGESZ63/FkyYJHxH
SgFIpXon3mbTvYkSMk+ToN9Fr0n795G37W2pylEfXI28IJ4KpajiheA=
-----END CERTIFICATE-----

Conclusions

In this section we have finally tied all pieces together and wrote a simple certificate signing bot. Refer to the Reference for further details.