Spark and Databricks pipelines move large volumes of text through transformations, and any of it can carry PII: support transcripts, application logs, free-text fields, scraped content. The cleanest place to redact it is in the job itself, as a column transformation, so the data is cleaned in the same pass that processes it and never has to leave the cluster.
There is no packaged Spark connector for this, and it does not need one. Phileas is a library, so you wrap it in a user-defined function (UDF) and apply it to any text column. Phileas ships for both the JVM (Java and Scala) and Python, so this guide shows the UDF both ways: PySpark, the natural fit for Databricks, and Java for Spark on the JVM. It is a build-it-yourself pattern, written as such, with the performance tradeoffs to plan for.
A PySpark redaction UDF
Install the Phileas Python library on your cluster, then define a UDF that redacts a string and apply it to a column. The Phileas service and policy are built once per worker and reused for every row, so the cost is paid once per executor, not per record.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from phileas import Policy, FilterService
# The redaction policy: what to detect and how to transform it.
POLICY = {
"identifiers": {
"emailAddress": {"emailAddressFilterStrategies": [{"strategy": "REDACT", "redactionFormat": "[EMAIL]"}]},
"ssn": {"ssnFilterStrategies": [{"strategy": "REDACT", "redactionFormat": "[SSN]"}]},
"phoneNumber": {"phoneNumberFilterStrategies": [{"strategy": "REDACT", "redactionFormat": "[PHONE]"}]},
}
}
# Build the service once per worker process, not once per row.
_service = None
_policy = None
def redact(text):
global _service, _policy
if text is None:
return None
if _service is None:
_service = FilterService()
_policy = Policy.from_dict(POLICY)
return _service.filter(_policy, "spark", "doc", text).filtered_text
redact_udf = udf(redact, StringType())
Apply it to a column with the DataFrame API:
cleaned = df.withColumn("note_redacted", redact_udf(df["note"]))
On a small sample, Email a@b.com becomes Email [EMAIL], SSN 123-45-6789 becomes SSN [SSN], and text with no PII is returned unchanged. You can also register the UDF for Spark SQL:
spark.udf.register("redact", redact, StringType())
spark.sql("SELECT id, redact(note) AS note_redacted FROM notes")
On Databricks
The same code runs on Databricks unchanged. Install the Phileas Python library on the cluster (a cluster library or %pip install in the notebook), define the UDF, and apply it in a notebook or job. To expose only redacted data to downstream consumers, write the redacted DataFrame to a table, or wrap the redaction in a view, and grant access to that rather than the raw table.
A Java redaction UDF
For Spark on the JVM, the same pattern uses the Phileas Java library directly. Build the filter service and policy once, hold them statically so they are reused across rows, and register a UDF1<String, String>. The redaction call is the same filter(policy, context, text) Philter and the other Phileas integrations use.
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import ai.philterd.phileas.PhileasConfiguration;
import ai.philterd.phileas.policy.Policy;
import ai.philterd.phileas.services.context.DefaultContextService;
import ai.philterd.phileas.services.disambiguation.vector.InMemoryVectorService;
import ai.philterd.phileas.services.filters.filtering.PlainTextFilterService;
import com.google.gson.Gson;
import java.util.Properties;
public final class PhileasUdf {
// Built once per executor JVM, then shared across all task threads. The service is
// safe to call concurrently, so redact() takes no lock; only the one-time init is
// synchronized. Do not wrap the filter() call itself in a lock: that would serialize
// every row on the executor and throw away Spark's parallelism.
private static volatile PlainTextFilterService service;
private static Policy policy;
private static synchronized PlainTextFilterService init() throws Exception {
if (service == null) {
String json = "{ \"identifiers\": {"
+ " \"emailAddress\": {\"emailAddressFilterStrategies\":[{\"strategy\":\"REDACT\",\"redactionFormat\":\"[EMAIL]\"}]},"
+ " \"ssn\": {\"ssnFilterStrategies\":[{\"strategy\":\"REDACT\",\"redactionFormat\":\"[SSN]\"}]} } }";
policy = new Gson().fromJson(json, Policy.class);
service = new PlainTextFilterService(
new PhileasConfiguration(new Properties()),
new DefaultContextService(), new InMemoryVectorService(), null);
}
return service;
}
private static String redact(String text) throws Exception {
if (text == null) {
return null;
}
PlainTextFilterService svc = service;
if (svc == null) {
svc = init();
}
return svc.filter(policy, "spark", text).getFilteredText();
}
public static void register(SparkSession spark) {
spark.udf().register("redact", (UDF1<String, String>) PhileasUdf::redact, DataTypes.StringType);
}
}
Register it and apply it the same way, in SQL or with the DataFrame API:
PhileasUdf.register(spark);
// Spark SQL
spark.sql("SELECT id, redact(note) AS note_redacted FROM notes").show(false);
// DataFrame API
df.selectExpr("id", "redact(note) AS note_redacted");
The Scala API is the same idea: register a UDF that calls service.filter(...).getFilteredText(). Whichever language, manage the Phileas dependency in your build and run on a JDK your Spark version supports.
Performance and what to plan for
A UDF runs on every row, so treat it as work on the hot path and tune accordingly.
- Build once per worker, then share it lock-free. Constructing the Phileas service and parsing the policy is not free. Do it once per executor (the lazy singleton above), never per row, and share the one instance across the executor’s task threads. The service is safe to call concurrently, so only the one-time init is synchronized; do not put a lock around the
filter()call itself, or you serialize every row and lose Spark’s parallelism. - Keep the policy lean for throughput. Pattern-based types (emails, phone numbers, SSNs, card numbers) are cheap. Model-based name detection is far heavier; if you only need the structured types, leave the models out.
- For heavy name detection, use a service. Loading a model on every executor is expensive. When you need model-based detection at scale, run the model in PhEye as a service the job calls, rather than embedding it in every worker.
- Partition and batch sensibly. Redaction parallelizes naturally across workers; size partitions so the per-row cost stays amortized.
A note on accuracy
Detection is probabilistic and configurable. These tools reduce how much sensitive data gets through; they do not catch every instance. Validate your policy against representative data from your own tables, set thresholds for your tolerance, and treat redaction as one layer of a defense-in-depth strategy. You remain responsible for the data you process.
Where to go next
- Writing your first redaction policy and the policy schema guide to build the policy the UDF applies.
- Self-hosted PII detection for running detection in your own boundary, including in Spark.
- PhEye for serving name-detection models when you need them at scale.