Harvester Sources & Enrichers
The harvester workspace is split into two complementary systems:
- Sources scrape fresh listings from third-party websites and push normalized payloads into Mill.
- Enrichers revisit properties already stored in Mill, augmenting them with better metadata, imagery, coordinates, or external valuations.
Understanding how these layers cooperate is essential when introducing a new data source or tightening data quality on an existing region.
Source Harvesters (harvesters/sources)
Section titled “Source Harvesters (harvesters/sources)”Source harvesters are self-contained packages (for example sources/homes-co-nz/) that implement the shared common.Harvester contract. Each harvester receives runtime options, performs HTTP collection, converts raw listings into the standardized property shape, and hands the batch to the Mill API.
type Harvester interface { GetName() string GetSource() string ScrapeProperties(opts HarvesterOptions) ([]Property, error) GetStats() HarvesterStats SetRateLimit(delay time.Duration) HealthCheck() error}Responsibilities
Section titled “Responsibilities”- Uniform CLI surface – the unified binary injects
HarvesterOptions(location, pagination, rate limits, Mill credentials) into every source at runtime, which keeps usage consistent independent of the underlying website logic.
type HarvesterOptions struct { Location string Pages int MaxProperties int Delay time.Duration Verbose bool DryRun bool Output string MillAPI string MillAPIKey string MillBatch bool HealthCheck bool}- Canonical data mapping – harvesters must emit
common.Property, so names, prices, media, and geo information flow into Mill in a single schema regardless of source idiosyncrasies.
type Property struct { SourceID string Source string Address string Price int PriceCurrency string Bedrooms int Bathrooms float64 ImageURLs []string AgentName string ListingDate time.Time SourceURL string CollectionTime time.Time Region string Latitude float64 Longitude float64 Features []string}- Mill submission flow – source harvesters rely on
common.MillClientto batch submit normalized records to/harvesters/properties/batch, reusing Mill’s validation layer.
Typical lifecycle
Section titled “Typical lifecycle”- Input parsing – the CLI reads flags/config, instantiates the requested harvester, and sets per-source rate limits via
SetRateLimit. - Collection – the harvester requests HTML/JSON pages from the upstream site, respecting delays from
HarvesterOptions.Delay. - Normalization – raw payloads get trimmed, deduplicated, and mapped into
common.Property. - Validation & submission – if
-dry-runis disabled, harvested properties are sent to Mill, and the harvester updates itsHarvesterStatsfor observability. - Health checks –
HealthCheck()implementations probe lightweight endpoints so we can quickly determine whether a source is blocked, throttled, or structurally broken.
When to build a new source
Section titled “When to build a new source”Add a new source when you need fresh listings we do not already ingest. Create sources/<domain>/harvester.go, implement the interface above, register it in main.go, and lean on the shared components under harvesters/common for token acquisition, HTTP helpers, and schema validation.
Harvester Enrichers (harvesters/enrichers)
Section titled “Harvester Enrichers (harvesters/enrichers)”Enrichers operate on the existing Mill corpus. Instead of scraping new listings, they:
- Page through Mill’s
/propertiesendpoint using the enrichment Mill client. - Apply filters/validators before touching external systems.
- Call downstream APIs (valuation services, geocoders, MLS mirrors, etc.).
- Merge the enriched fields back into a
common.Property. - Submit that property through the same harvester endpoint so Mill performs full validation again.
The orchestration pattern is shared:
- Scheduler – pulls properties in batches and fans them out to workers with bounded concurrency.
- Enrichment Service – wraps the business rules (filtering, normalization, merging).
- Mill Client – lists and re-submits properties, fetching service tokens as needed.
- External Clients – domain-specific fetchers (e.g., Homes.co.nz, OpenStreetMap, valuation APIs).
This separation lets us run enrichers continuously (e.g., via Cloud Run or a CronJob) without interfering with first-party scrapers.
homes-co-nz Enricher Deep Dive
Section titled “homes-co-nz Enricher Deep Dive”The harvesters/enrichers/homes-co-nz service is the canonical example of the enrichment architecture. It continuously refreshes New Zealand listings with address validation, coordinates, and data scraped from homes.co.nz itself.
Scheduler-driven ingestion
Section titled “Scheduler-driven ingestion”internal/scheduler pages through Mill using configured batch sizes, offsets, and worker pools. Each batch is processed concurrently but capped by worker_count, so we respect both Mill and homes.co.nz rate limits.
func (s *Scheduler) RunOnce(ctx context.Context) error { resp, err := s.mill.ListProperties(ctx, millclient.ListParams{ Limit: s.batchSize, Offset: offset, }) ... return s.processBatch(ctx, resp.Properties)}NZ-only filtering & normalization
Section titled “NZ-only filtering & normalization”Each property is mapped into the standard schema, then skipped if it is outside New Zealand. Detection uses country codes, bounding boxes, and city-name heuristics so we do not waste requests on non-target regions. Afterwards, core fields (address, price, currency, property type, images, and features) are validated and normalized before any external calls are attempted.
func (s *Service) Enrich(ctx context.Context, prop millclient.Property) (*common.Property, error) { base := mapMillProperty(prop) if !isNewZealandProperty(prop) { return nil, nil } if err := s.validateAndNormalize(base); err != nil { logger.Warn(...) } ... return base, nil}func isNewZealandProperty(prop millclient.Property) bool { // Country code, coordinate bounds, and NZ keyword checks}func (s *Service) validateAndNormalize(prop *common.Property) error { // Address trimming, price/currency fixes, image dedupe, feature cleanup, region normalization}Geocoding fallback
Section titled “Geocoding fallback”When lat/lon are missing, the service calls the Mill-hosted geocoding API. The client is configurable (geocoding.enabled, base URL, timeout) and only fires once per property to reduce external load.
if s.geocodingClient != nil && base.Latitude == 0 { address := NormalizeNZAddress(chooseAddress(prop)) lat, lon, err := s.geocodingClient.Geocode(ctx, address) ... base.Latitude = lat base.Longitude = lon}func (c *Client) Geocode(ctx context.Context, address string) (float64, float64, error) { // POST /api/v1/geocode with validation and logging}Homes.co.nz search & scraping
Section titled “Homes.co.nz search & scraping”For properties that qualify, the enricher tries multiple address permutations combined with the latest coordinates to locate the matching homes.co.nz entry. The Homes client enforces its own rate limiting, hits the gateway address/search endpoint, and then scrapes the property page for embedded JSON state containing beds, baths, media, and metadata.
details := s.tryHomesSearch(ctx, prop, base)// Iterates address variations, logs attempts, validates matches, and returns PropertyDatafunc (c *Client) FetchDetails(ctx context.Context, query Query) (*PropertyData, error) { // Gateway search → property URL construction → homes-app-state scrape}Merge heuristics & Mill submission
Section titled “Merge heuristics & Mill submission”Once Homes data is available, the enricher applies configurable merge rules—optionally replacing empty core fields, preferring Homes images, capping total images, and appending valuation metadata. Finally, it submits the enriched record back through Mill’s harvester endpoint using the enrichment Mill client.
func mergeHomesData(base *common.Property, details *homesclient.PropertyData, cfg cfg.EnrichmentConfig) { // Replace empties, merge images with dedupe, append metadata + valuation confidence}func (c *Client) SubmitEnrichedProperty(ctx context.Context, property common.Property) error { // Wraps writer.SubmitProperty with context cancellation}Configuration knobs
Section titled “Configuration knobs”All scheduler cadence, concurrency, merge heuristics, and downstream clients are described in config.example.yaml and loaded via the typed config package. Defaults include a 15-minute interval, 50-record batches, automatic Mill token acquisition, optional Homes scraping (disabled if homes_co_nz.base_url is blank), and geocoding enabled.
type Config struct { Service ServiceConfig Mill MillConfig Scheduler SchedulerConfig Homes HomesConfig Enrichment EnrichmentConfig Geocoding GeocodingConfig}// Includes defaults for intervals, worker counts, rate limits, image caps, etc.Modes of operation
Section titled “Modes of operation”- Validation-only – leave
homes_co_nz.base_urlempty to skip external scraping; the service will still normalize NZ listings, fix images, geocode, and resubmit improved data. - Full enrichment – provide a Homes base URL and optional API key to unlock scraping. The scheduler then augments listings with value estimates, more imagery, and richer metadata while observing the 2s default rate limit defined in the Homes client.
Together, these stages ensure the homes-co-nz enricher continuously lifts Mill’s New Zealand data quality without duplicating scraper logic or introducing inconsistent schemas. Use it as the reference implementation whenever you need to build enrichers for other regions or datasets.