Discovery & Enrichment
Every connector invocation runs two sequential phases:
- Discovery — scrape fresh listings from a third-party site and submit them to Mill.
- Enrichment — drain one batch of re-scrape requests from a Kafka topic and push updates back to Mill.
This dual-phase design means a single CronJob keeps the corpus both fresh and accurate without separate deployment concerns.
Discovery Phase
Section titled “Discovery Phase”Discovery works the same way it always has. The connector scrapes paginated search results, extracts individual listing URLs, visits each one, maps the raw HTML/JSON into common.Property, and submits the batch to Mill via HTTP or Kafka.
All connectors implement the Connector interface:
type Connector interface { GetName() string GetSource() string ScrapeProperties(opts ConnectorOptions) ([]Property, error) GetStats() ConnectorStats SetRateLimit(delay time.Duration) HealthCheck() error}Enrichment Phase
Section titled “Enrichment Phase”After discovery, if the connector also implements the Enricher interface and -kafka-brokers is supplied on the command line, it drains up to 50 enrichment requests from the property-enrichment Kafka topic before exiting.
type Enricher interface { GetCountry() string EnrichProperty(ctx context.Context, req EnrichmentRequest) (*Property, error)}
type EnrichmentRequest struct { PropertyID string SourceURL string Source string Country string MissingFields []string}GetCountry() returns an ISO-3166-1 alpha-2 code (e.g. "NZ", "AU", "MT"). The enrichment consumer uses this to filter messages — messages for other countries are skipped and committed without processing.
EnrichProperty() receives the original listing URL and a list of fields that are known to be missing or stale. The implementation re-scrapes that URL and returns a *Property with fresh data. Returning (nil, nil) is safe — it signals that the property cannot be found or is not handled by this connector.
How Mill Populates the Queue
Section titled “How Mill Populates the Queue”Mill’s EnrichmentScheduler runs as a background goroutine every hour. Each cycle it:
- Queries the
propertiestable for candidates:- Priority — properties with missing images, zero price, or zero bedrooms are queued immediately.
- Staleness — properties whose
updated_atis older than two days are also included.
- Applies a cooloff — a property is skipped if its
enrichment_queued_attimestamp is less than 24 hours ago. - Publishes one
EnrichmentRequestmessage per candidate to theproperty-enrichmentKafka topic, usingproperty_idas the message key. - Stamps
enrichment_queued_at = NOW()on all published properties.
The enrichment_queued_at column is added by Migration 04 and is NULL for properties that have never been queued.
Enrichment Consumer
Section titled “Enrichment Consumer”connectors/common/enrichment_consumer.go wraps a kafka-go Reader in a bounded batch mode — it reads up to N messages (default 50) with a short read deadline (default 5 s), processes them, then returns. It does not run as a long-lived loop; the CronJob provides the cadence.
// Simplified flow inside RunBatchfor processed < cfg.BatchSize { msg, err := reader.FetchMessage(readCtx) // short deadline if err != nil { break } // timeout = done
if req.Country != enricher.GetCountry() { reader.CommitMessages(ctx, msg) // skip, not our country continue }
enriched, _ := enricher.EnrichProperty(ctx, req) if enriched != nil { submitter.SubmitProperty(*enriched) } reader.CommitMessages(ctx, msg)}Offsets are committed only after a successful submit attempt. If EnrichProperty returns an error the message is still committed to avoid tight retry loops — Mill’s deduplication and validation layers guard against corrupt data.
Run Sequence
Section titled “Run Sequence”connector invocation │ ├─ 1. Discovery │ ScrapeProperties(opts) → submit to Mill │ └─ 2. Enrichment batch (only if connector implements Enricher AND -kafka-brokers is set) EnrichmentConsumer.RunBatch(ctx) ├─ read up to 50 msgs from "property-enrichment" ├─ filter by GetCountry() ├─ EnrichProperty(req) → submit to Mill └─ return (exit)Implementing Enricher on a Connector
Section titled “Implementing Enricher on a Connector”Add GetCountry() and EnrichProperty() to an existing connector struct. The method should reuse the connector’s existing single-page scraping logic, targeting the specific req.SourceURL instead of a paginated search URL.
func (c *MyConnector) GetCountry() string { return "XX" }
func (c *MyConnector) EnrichProperty(ctx context.Context, req common.EnrichmentRequest) (*common.Property, error) { if req.SourceURL == "" || !strings.Contains(req.SourceURL, "mysite.com") { return nil, nil } detail, err := c.scrapeDetailPage(req.SourceURL) if err != nil { return nil, err } return detail, nil}Return (nil, nil) when the URL is unrecognised — the consumer commits the offset and moves on.
Connectors with Enrichment Support
Section titled “Connectors with Enrichment Support”| Connector | Country | Source |
|---|---|---|
homes-co-nz | NZ | homes.co.nz |
realestate-au | AU | realestate.com.au |
domain-au | AU | domain.com.au |
homely-au | AU | homely.com.au |
allhomes-au | AU | allhomes.com.au |
property-au | AU | property.com.au |
view-au | AU | view.com.au |
onthehouse-au | AU | onthehouse.com.au |
rent-au | AU | rent.com.au |
reiwa-au | AU | reiwa.com.au |
realcommercial-au | AU | realcommercial.com.au |
commercialrealestate-au | AU | commercialrealestate.com.au |
maltapark | MT | maltapark.com |
rightmove-co-uk | GB | rightmove.co.uk |
vivareal-br | BR | vivareal.com.br |
zonaprop-ar | AR | zonaprop.com.ar |
lamudi-mx | MX | lamudi.com.mx |
realtor-ca | CA | realtor.ca |
Enabling Enrichment on a CronJob
Section titled “Enabling Enrichment on a CronJob”Add -kafka-brokers to an existing CronJob’s args. No new jobs are required.
args: - "-connector" - "homes-co-nz" - "-mill-api" - "http://mill:4000" - "-kafka-brokers" # adds enrichment phase - "redpanda:9092" - "-max" - "2000"Connectors that do not implement Enricher silently skip the enrichment phase even when -kafka-brokers is supplied.
MaltaPark Connector
Section titled “MaltaPark Connector”connectors/sources/maltapark/ is the reference implementation for a connector that ships with both Connector and Enricher from the start.
- Discovery scrapes the for-sale and for-rent category pages on
maltapark.com, collects listing anchor links, visits each detail page, and extracts address, EUR price, bedrooms, bathrooms, images, and description. - Enrichment re-visits a known listing URL to refresh any of those fields.
- Country code:
MT(Malta). - Currency: EUR.
- CronJob:
k8s/connectors/maltapark-connector-cronjob.yaml, scheduled hourly at minute 30.
Data Flow Diagram
Section titled “Data Flow Diagram”Mill EnrichmentScheduler (hourly) │ ├─ SELECT candidates WHERE │ (missing images/price/rooms OR updated_at < now-2d) │ AND enrichment_queued_at < now-1d │ ├─ PUBLISH to "property-enrichment" topic │ key = property_id │ └─ UPDATE enrichment_queued_at = now
Connector CronJob (e.g. hourly) │ ├─ Discovery: scrape → POST /connectors/properties/batch │ └─ Enrichment: consume "property-enrichment" ├─ filter Country == GetCountry() ├─ EnrichProperty(sourceURL) └─ POST /connectors/properties/single