It is exceedingly common to want a server to be doing things while it isn't actively handling an HTTP request.
We call these things "background jobs" for they are jobs done in the back of the round.
proletarian
to your deps.edn
.This is a library which manages pulling and executing background jobs scheduled in postgres.
{:paths ["src"]
:deps {org.clojure/clojure {:mvn/version "1.12.0"}
ring/ring {:mvn/version "1.13.0"}
metosin/reitit-ring {:mvn/version "0.7.2"}
org.clojure/tools.logging {:mvn/version "1.3.0"}
org.slf4j/slf4j-simple {:mvn/version "2.0.16"}
hiccup/hiccup {:mvn/version "2.0.0-RC3"}
com.github.seancorfield/next.jdbc {:mvn/version "1.3.955"}
org.postgresql/postgresql {:mvn/version "42.7.4"}
com.zaxxer/HikariCP {:mvn/version "6.0.0"}
io.github.cdimascio/dotenv-java {:mvn/version "3.0.2"}
ring/ring-defaults {:mvn/version "0.5.0"}
msolli/proletarian {:mvn/version "1.0.86-alpha"}}
:aliases {:dev {:extra-paths ["dev" "test"]
:extra-deps {nrepl/nrepl {:mvn/version "1.3.0"}
lambdaisland/kaocha {:mvn/version "1.91.1392"}}}
:format {:deps {dev.weavejester/cljfmt {:mvn/version "0.13.0"}}}
:lint {:deps {clj-kondo/clj-kondo {:mvn/version "2024.09.27"}}}}}
proletarian
The DDL is documented in the repo for that project. All we need to do is slap that in a migration.
One difference is that we want to give defaults for job_id
, queue
, attempts
,
enqueued_at
, and process_at
. This will make it easier to do inserts directly in triggers.
-- // add_proletarian_tables
CREATE SCHEMA IF NOT EXISTS proletarian;
CREATE TABLE IF NOT EXISTS proletarian.job (
job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- job id, generated and returned by proletarian.job/enqueue!
queue TEXT NOT NULL DEFAULT ':proletarian/default', -- queue name
job_type TEXT NOT NULL, -- job type
payload TEXT NOT NULL, -- Transit-encoded job data
attempts INTEGER NOT NULL DEFAULT 0, -- Number of attempts. Starts at 0. Increments when the job is processed.
enqueued_at TIMESTAMP NOT NULL DEFAULT now(), -- When the job was enqueued (never changes)
process_at TIMESTAMP NOT NULL DEFAULT now() -- When the job should be run (updates every retry)
);
CREATE TABLE IF NOT EXISTS proletarian.archived_job (
job_id UUID PRIMARY KEY, -- Copied from job record.
queue TEXT NOT NULL, -- Copied from job record.
job_type TEXT NOT NULL, -- Copied from job record.
payload TEXT NOT NULL, -- Copied from job record.
attempts INTEGER NOT NULL, -- Copied from job record.
enqueued_at TIMESTAMP NOT NULL, -- Copied from job record.
process_at TIMESTAMP NOT NULL, -- Copied from job record (data for the last run only)
status TEXT NOT NULL, -- success / failure
finished_at TIMESTAMP NOT NULL -- When the job was finished (success or failure)
);
CREATE INDEX job_queue_process_at ON proletarian.job (queue, process_at);
-- //@UNDO
DROP TABLE proletarian.archived_job;
DROP TABLE proletarian.job;
DROP SCHEMA proletarian;
src/example/jobs.clj
and put the following into itThe logging code we want because, by default, the logger used by proletarian
is very noisy. You will see a log for every time it polls to see if there is a new job, which happens
frequently.
The handlers map we will fill in with handlers for different job types. process-job
will then delegate to that. We use this instead of defmulti
because it's easier to reason about
whether a given job handler has been registered.
(ns example.jobs
(:require [clojure.tools.logging :as log]
[proletarian.worker :as-alias worker]))
(defn log-level
[x]
(case x
::worker/queue-worker-shutdown-error :error
::worker/handle-job-exception-with-interrupt :error
::worker/handle-job-exception :error
::worker/job-worker-error :error
::worker/polling-for-jobs :debug
:proletarian.retry/not-retrying :error
:info))
(defn logger
[x data]
(log/logp (log-level x) x data))
(defn handlers
[]
{})
(defn process-job
[system job-type payload]
(if-let [handler (get (handlers) job-type)]
(handler system job-type payload)
(log/error "Unhandled Job Type" {:job-type job-type})))
system
namespace.This will include wiring up the custom logger from earlier.
(ns example.system
(:require [example.jobs :as jobs]
[example.routes :as routes]
[next.jdbc.connection :as connection]
[proletarian.worker :as worker]
[ring.adapter.jetty :as jetty])
(:import (com.zaxxer.hikari HikariDataSource)
(io.github.cdimascio.dotenv Dotenv)
(org.eclipse.jetty.server Server)))
(set! *warn-on-reflection* true)
(defn start-env
[]
(Dotenv/load))
(defn start-db
[{::keys [env]}]
(connection/->pool HikariDataSource
{:dbtype "postgres"
:dbname "postgres"
:username (Dotenv/.get env "POSTGRES_USERNAME")
:password (Dotenv/.get env "POSTGRES_PASSWORD")}))
(defn stop-db
[db]
(HikariDataSource/.close db))
(defn start-worker
[{::keys [db] :as system}]
(let [worker (worker/create-queue-worker
db
(partial #'jobs/process-job system)
{:proletarian/log #'jobs/logger})]
(worker/start! worker)
worker))
(defn stop-worker
[worker]
(worker/stop! worker))
(defn start-server
[{::keys [env] :as system}]
(jetty/run-jetty
(partial #'routes/root-handler system)
{:port (parse-long (Dotenv/.get env "PORT"))
:join? false}))
(defn stop-server
[server]
(Server/.stop server))
(defn start-system
[]
(let [system-so-far {::env (start-env)}
system-so-far (merge system-so-far {::db (start-db system-so-far)})
system-so-far (merge system-so-far {::worker (start-worker system-so-far)})]
(merge system-so-far {::server (start-server system-so-far)})))
(defn stop-system
[system]
(stop-server (::server system))
(stop-worker (::worker system))
(stop-db (::db system)))
cheshire
to your deps.edn
cheshire
lets us read and write JSON
. We want this because
we want to serialize our job data as JSON
, not Transit
. We want that
for reasons that will become clear soon.
{:paths ["src"]
:deps {org.clojure/clojure {:mvn/version "1.12.0"}
ring/ring {:mvn/version "1.13.0"}
metosin/reitit-ring {:mvn/version "0.7.2"}
org.clojure/tools.logging {:mvn/version "1.3.0"}
org.slf4j/slf4j-simple {:mvn/version "2.0.16"}
hiccup/hiccup {:mvn/version "2.0.0-RC3"}
com.github.seancorfield/next.jdbc {:mvn/version "1.3.955"}
org.postgresql/postgresql {:mvn/version "42.7.4"}
com.zaxxer/HikariCP {:mvn/version "6.0.0"}
io.github.cdimascio/dotenv-java {:mvn/version "3.0.2"}
ring/ring-defaults {:mvn/version "0.5.0"}
msolli/proletarian {:mvn/version "1.0.86-alpha"}
cheshire/cheshire {:mvn/version "5.13.0"}}
:aliases {:dev {:extra-paths ["dev" "test"]
:extra-deps {nrepl/nrepl {:mvn/version "1.3.0"}
lambdaisland/kaocha {:mvn/version "1.91.1392"}}}
:format {:deps {dev.weavejester/cljfmt {:mvn/version "0.13.0"}}}
:lint {:deps {clj-kondo/clj-kondo {:mvn/version "2024.09.27"}}}}}
JSON
serializer in src/example/jobs.clj
(ns example.jobs
(:require [clojure.tools.logging :as log]
[proletarian.worker :as-alias worker]
[proletarian.protocols :as protocols]
[cheshire.core :as cheshire]))
(def json-serializer
(reify protocols/Serializer
(encode [_ data]
(cheshire/generate-string data))
(decode [_ data-string]
(cheshire/parse-string data-string keyword))))
...
JSON
serializer when making your worker....
(defn start-worker
[{::keys [db] :as system}]
(let [worker (worker/create-queue-worker
db
(partial #'jobs/process-job system)
{:proletarian/log #'jobs/logger
:proletarian/serializer jobs/json-serializer})]
(worker/start! worker)
worker))
...
prehistoric.cave
This is why we added those default values in proletarian.job
's DDL and why we bother
using a JSON
serializer specifically.
While it is perfectly fine to enqueue jobs in code, its often more appropriate to have jobs happen in response to changes in data.
-- // cave insert trigger
CREATE OR REPLACE FUNCTION prehistoric_cave_insert_function ()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO proletarian.job(
job_type, payload
)
VALUES (
':prehistoric.cave/insert', row_to_json(NEW)
);
return NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER prehistoric_cave_insert_trigger
AFTER INSERT ON prehistoric.cave
FOR EACH ROW
EXECUTE PROCEDURE prehistoric_cave_insert_function();
-- //@UNDO
DROP TRIGGER prehistoric_cave_insert_trigger ON prehistoric.cave;
DROP FUNCTION prehistoric_cave_insert_function;
hominid
whenever a cave is inserted.In this situation we could have just done the cascading insert in a trigger of its own. This is the more generically useful approach though, so its worth showing.
(ns example.cave.jobs
(:require [example.system :as-alias system]
[next.jdbc :as jdbc])
(:import (java.util UUID)))
(set! *warn-on-reflection* true)
(defn process-cave-insert
[{::system/keys [db]} _job-type payload]
(jdbc/execute!
db
["INSERT INTO prehistoric.hominid(name, cave_id)
VALUES (?, ?)"
"Grunk"
(UUID/fromString (:id payload))]))
(defn handlers
[]
{:prehistoric.cave/insert #'process-cave-insert})
Very similar to HTTP handlers, whenever you have a background job to register you manually merge it with all the existing ones.
(ns example.jobs
(:require [cheshire.core :as cheshire]
[clojure.tools.logging :as log]
[example.cave.jobs :as cave-jobs]
[proletarian.protocols :as protocols]
[proletarian.worker :as-alias worker]))
(def json-serializer
(reify protocols/Serializer
(encode [_ data]
(cheshire/generate-string data))
(decode [_ data-string]
(cheshire/parse-string data-string keyword))))
(defn log-level
[x]
(case x
::worker/queue-worker-shutdown-error :error
::worker/handle-job-exception-with-interrupt :error
::worker/handle-job-exception :error
::worker/job-worker-error :error
::worker/polling-for-jobs :debug
:proletarian.retry/not-retrying :error
:info))
(defn logger
[x data]
(log/logp (log-level x) x data))
(defn handlers
[]
(merge
{}
(cave-jobs/handlers)))
(defn process-job
[system job-type payload]
(if-let [handler (get (handlers) job-type)]
(handler system job-type payload)
(log/error "Unhandled Job Type" {:job-type job-type})))
After all that setup, you can go to /cave
and insert a cave. You should see logs
showing that the background job was picked up and run by the worker. If you look into the prehistoric.hominid
table you will see that the correct row was inserted.