Parent

Mongo::CollectionWriter

Public Class Methods

new(collection) click to toggle source
# File lib/mongo/collection_writer.rb, line 32
def initialize(collection)
  @collection = collection
  @name = @collection.name
  @db = @collection.db
  @connection = @db.connection
  @logger     = @connection.logger
  @max_write_batch_size = Mongo::MongoClient::DEFAULT_MAX_WRITE_BATCH_SIZE
end

Public Instance Methods

batch_write(op_type, documents, check_keys=true, opts={}) click to toggle source
batch_write_incremental(op_type, documents, check_keys=true, opts={}) click to toggle source

common implementation only for new batch write commands (insert, update, delete) and old batch insert

# File lib/mongo/collection_writer.rb, line 42
def batch_write_incremental(op_type, documents, check_keys=true, opts={})
  raise Mongo::OperationFailure, "Request contains no documents" if documents.empty?
  write_concern = get_write_concern(opts, @collection)
  max_message_size, max_append_size, max_serialize_size = batch_write_max_sizes(write_concern)
  ordered = opts[:ordered]
  continue_on_error = !!opts[:continue_on_error] || ordered == false
  collect_on_error = !!opts[:collect_on_error] || ordered == false
  error_docs = [] # docs with serialization errors
  errors = []
  write_concern_errors = []
  exchanges = []
  serialized_doc = nil
  message = BSON::ByteBuffer.new("", max_message_size)
  @max_write_batch_size = @collection.db.connection.max_write_batch_size
  docs = documents.dup
  catch(:error) do
    until docs.empty? || (!errors.empty? && !collect_on_error) # process documents a batch at a time
      batch_docs = []
      batch_message_initialize(message, op_type, continue_on_error, write_concern)
      while !docs.empty? && batch_docs.size < @max_write_batch_size
        begin
          doc = docs.first
          doc = doc[:d] if op_type == :insert && !ordered.nil? #check_keys for :update outside of serialize
          serialized_doc ||= BSON::BSON_CODER.serialize(doc, check_keys, true, max_serialize_size)
        rescue BSON::InvalidDocument, BSON::InvalidKeyName, BSON::InvalidStringEncoding => ex
          bulk_message = "Bulk write error - #{ex.message} - examine result for complete information"
          ex = BulkWriteError.new(bulk_message, Mongo::ErrorCode::INVALID_BSON,
                                  {:op_type => op_type, :serialize => doc, :ord => docs.first[:ord], :error => ex}) unless ordered.nil?
          error_docs << docs.shift
          errors << ex
          next if collect_on_error
          throw(:error) if batch_docs.empty?
          break # defer exit and send batch
        end
        break if message.size + serialized_doc.size > max_append_size
        batch_docs << docs.shift
        batch_message_append(message, serialized_doc, write_concern)
        serialized_doc = nil
      end
      begin
        response = batch_message_send(message, op_type, batch_docs, write_concern, continue_on_error) if batch_docs.size > 0
        exchanges << {:op_type => op_type, :batch => batch_docs, :opts => opts, :response => response}
      rescue Mongo::WriteConcernError => ex
        write_concern_errors << ex
        exchanges << {:op_type => op_type, :batch => batch_docs, :opts => opts, :response => ex.result}
      rescue Mongo::OperationFailure => ex
        errors << ex
        exchanges << {:op_type => op_type, :batch => batch_docs, :opts => opts, :response => ex.result}
        throw(:error) unless continue_on_error
      end
    end
  end
  [error_docs, errors, write_concern_errors, exchanges]
end
Also aliased as: batch_write
batch_write_partition(op_type, documents, check_keys, opts) click to toggle source
# File lib/mongo/collection_writer.rb, line 97
def batch_write_partition(op_type, documents, check_keys, opts)
  raise Mongo::OperationFailure, "Request contains no documents" if documents.empty?
  write_concern = get_write_concern(opts, @collection)
  ordered = opts[:ordered]
  continue_on_error = !!opts[:continue_on_error] || ordered == false # continue_on_error default false
  collect_on_error = !!opts[:collect_on_error] # collect_on_error default false
  error_docs = [] # docs with serialization errors
  errors = []
  write_concern_errors = []
  exchanges = []
  @max_write_batch_size = @collection.db.connection.max_write_batch_size
  @write_batch_size = [documents.size, @max_write_batch_size].min
  docs = documents.dup
  until docs.empty?
    batch = docs.take(@write_batch_size)
    begin
      batch_to_send = batch #(op_type == :insert && !ordered.nil?) ? batch.collect{|doc|doc[:d]} : batch
      if @collection.db.connection.use_write_command?(write_concern) # TODO - polymorphic send_write including legacy insert
        response = send_bulk_write_command(op_type, batch_to_send, check_keys, opts)
      else
        response = send_write_operation(op_type, nil, batch_to_send, check_keys, opts, write_concern)
      end
      exchanges << {:op_type => op_type, :batch => batch, :opts => opts, :response => response}
      docs = docs.drop(batch.size)
      @write_batch_size = [(@write_batch_size*1097) >> 10, @write_batch_size+1].max unless docs.empty? # 2**(1/10) multiplicative increase
      @write_batch_size = @max_write_batch_size if @write_batch_size > @max_write_batch_size
    rescue BSON::InvalidDocument, BSON::InvalidKeyName, BSON::InvalidStringEncoding => ex
      if @write_batch_size > 1 # decrease batch size
        @write_batch_size = (@write_batch_size+1) >> 1 # 2**(-1) multiplicative decrease
        next
      end
      # error on a single document
      bulk_message = "Bulk write error - #{ex.message} - examine result for complete information"
      ex = BulkWriteError.new(bulk_message, Mongo::ErrorCode::INVALID_BSON,
                              {:op_type => op_type, :batch => batch, :ord => batch.first[:ord], :opts => opts, :error => ex}) unless ordered.nil?
      error_docs << docs.shift
      next if collect_on_error
      errors << ex
      break unless continue_on_error
    rescue Mongo::WriteConcernError => ex
      write_concern_errors << ex
      exchanges << {:op_type => op_type, :batch => batch_docs, :opts => opts, :response => ex.result}
      docs = docs.drop(batch.size)
    rescue Mongo::OperationFailure => ex
      errors << ex
      exchanges << {:op_type => op_type, :batch => batch, :opts => opts, :response => ex.result}
      docs = docs.drop(batch.size)
      break if !continue_on_error && !collect_on_error
    end
  end
  [error_docs, errors, write_concern_errors, exchanges]
end
send_bulk_write_command(op_type, documents, check_keys, opts, collection_name=@name) click to toggle source
# File lib/mongo/collection_writer.rb, line 152
def send_bulk_write_command(op_type, documents, check_keys, opts, collection_name=@name)
  if op_type == :insert
    documents = documents.collect{|doc| doc[:d]} if opts.key?(:ordered)
    documents.each do |doc|
      # TODO - @pk_factory.create_pk(doc)
      if check_keys
        doc.each_key do |key|
          key = key.to_s
          raise BSON::InvalidKeyName.new("key #{key} must not start with '$'") if key[0] == $$
          raise BSON::InvalidKeyName.new("key #{key} must not contain '.'") if key.include? ..
        end
      end
    end
  #elsif op_type == :update # TODO - check keys
  #elsif op_type == :delete
  #else
  #  raise ArgumentError, "Write operation type must be :insert, :update or :delete"
  end
  request = BSON::OrderedHash[op_type, collection_name].merge!(
      Mongo::CollectionWriter::WRITE_COMMAND_ARG_KEY[op_type] => documents,
      :writeConcern => get_write_concern(opts, @collection),
      :ordered => opts[:ordered] || !opts[:continue_on_error]
  )
  @db.command(request)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.